Koji Kawamura created NIFI-3545:
-----------------------------------
Summary: Let M FlowFilews pass through once N signals arrive
Key: NIFI-3545
URL: https://issues.apache.org/jira/browse/NIFI-3545
Project: Apache NiFi
Issue Type: Improvement
Components: Extensions
Reporter: Koji Kawamura
Assignee: Koji Kawamura
If Wait processor can:
"Let M flow files pass through once N notify signals arrived for key K"
we can support more variety type of use-cases. Currently, it only support
"Let 1 flow file pass through once N notify signals arrived for key K"
h3. How it works? Simulation
For example, let's say there are 50 incoming flow files at the beginning, f1 to
f50.
N=3, M=100
It can be read as "Wait processor is allowed to convert 3 signals to get 100
pass tickets."
1. There's no signal for K, all flow files are waiting
2. Notify sends a signal. K( N=1 ) doesn't meet Wait condition, Wait processor
is still waiting
3. Notify sends another two signals. Now K( N=3 ) matches Wait condition
4. Wait processor starts consuming flow files, f1 to f50, update K( N=3, M=50),
where M denotes remaining number of flow files those can go through
5. Another 30 flow files arrive, Wait processor consumes f51 to f80, update K(
N=0, M=20)
6. Another 30 flow files arrive, Wait processor consumes f81 to f100. K is now
K( N=0, M=0 ). Since all N and M is used, Wait processor removes K. f101 to
f110 are waiting for signals, same state as #1.
h4. Alternative path after 6
7a. If Notify sends additional signals, then f101 to f110 can go through
7b. If Notify doesn't send any more signal, then f101 to f110 will be routed to
expired
h4. Alternative path after 5
6a. If Notify sends additional signal at this point, K would be K( N=1, M=20).
Wait processor can process 20 flow files because it still has M=20.
6b. If Notify sends additional three signals, K would be K(N=3, M=20). Wait
processor consumes 20 flow files, and when 21th flow file comes, it immediately
convert N to M, meaning consume N(3) to create M(100) pass, then K(N=0, M=100)
Additionally, we can let user configure M=0. Meaning, Wait can release any
number of incoming flow files as long as N meets the condition.
With this, Notify +1 can behave as if it opens a GATE, and Notify –1 will close
it.
h4. Another possible use-case, 'Limit data flow rate at cluster wide'
It's more complex than just supporting GATE open/close state. However, if we
support M flow files to go through, it can also provide rate limit across
cluster.
Example use case, NiFi A push data via S2S to NiFi B, and want to limit 100
flow files per 5 min.
On NiFi A:
Notify part of flow: GenerateFlowFile(5 min, on primary) -> Notify(K, N=+1)
Wait part of flow: Some ingested data -> Wait(K, N=1, M=100)
Since Wait/Notify state is managed globally via DistributedCache, we can limit
throughput cluster wide.
If use case requires to limit rate exactly, then they can design Notify part as:
GenerateFlowFile(5 min, on primary) -> Notify(K, N=0) -> Notify(K, N=+1)
It avoids N to be added up when there's no traffic.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)