Mark Payne created NIP-15:
-----------------------------
Summary: Dry Run Capability
Key: NIP-15
URL: https://issues.apache.org/jira/browse/NIP-15
Project: NiFi Improvement Proposal
Issue Type: New Feature
Reporter: Mark Payne
h2. *Background & Motivation*
Currently, users lack a safe, easy-to-use, isolated mechanism to verify
processor behavior with its given configuration. The goal of this feature is to
allow users to perform a dry-run of a Processor, providing it input to operate
on (or potentially no input in the case of source Processors) and receive the
results.
Often testing requires running the flow up to the point of the Processor in
question, running it once, clicking to view content, potentially tweaking the
input, doing it again, and iterating several times. This works but in some
cases can be burdensome, especially when there is data enqueued upstream of the
Processor already. Additionally, it can be problematic for a Processor such as
ConsumeKafka where one might want to just get some sample data but doing so
would commit offsets, resulting in either having to reset offsets or even
potentially lose data in the production system.
h2. *Dry Run Validity*
Dry Runs will not be valid for all Processors, as it may cause undesirable side
effects. A great example of this is the ConsumeKafka Processor noted above.
Without any modifications, if it were to be triggered to run as a Dry Run, it
would still acknowledge its consumption of data to Apache Kafka, and this could
result in data loss.
To prevent such a scenario, a Processor can have a Dry Run session established
in the following scenarios:
* Processor is annotated with @SupportsDryRun - this is a new Annotation and
denotes explicitly that the Processor supports Dry Runs.
* Processor is annotated with @SideEffectFree - any Processor with this
annotation is already stating that it has no side effects outside of the NiFi,
and the framework will provide isolation within NiFi in the case of a Dry Run.
We will allow establishing a Dry Run session when a Processor’s state is either
STOPPED or RUNNING. In the event that the Processor is RUNNING, a Dry Run
session will be allowed only if it is not annotated with @TriggerSerially. In
the event that the Processor is STOPPED, a Dry Run session will be allowed only
if the Processor is valid. We will not allow a Dry Run session to be
established while the Processor is in an intermediate state (STARTING,
STOPPING) or when the Processor is DISABLED.
h2. *Lifecycle*
Once a Dry Run session has been created, it will remain RUNNING until one of
the following happens:
* It is cancelled → Transitions to STOPPING → CANCELLED
* An Exception is thrown from the Processor → Transitions to STOPPING → FAILED
* If there were any input FlowFiles supplied, they have all been processed →
Transitions to STOPPING → COMPLETED
* If there were no input FlowFiles supplied, at least one FlowFile is output →
Transitions to STOPPING → COMPLETED
When a Dry Run session is created, it will have state NEW. This allows
FlowFiles to be uploaded, etc. in order in prepare it to run. The state is then
transitioned to RUNNING (which will result in a temporary state of STARTING as
@OnScheduled methods, etc. are called). Once completed, a transient state of
STOPPING will be used while @OnStopped methods, etc. are called. Finally, the
state will transition to one of COMPLETED, CANCELLED, or FAILED depending on
the outcome.
h2. *Scope*
The scope of this feature centers on a new set of REST endpoints designed to
manage the lifecycle of a "Dry Run." These APIs allow for the creation of a
test session, the injection of test data, and the retrieval of results.
*Base Resource:* /processors/\{id}/dry-run
h3. *Core Lifecycle*
* *POST* */processors/\{processorId}/dry-run*
* *Usage:* Creates a new Dry Run session.
* *Behavior:* Fails immediately if the request is invalid. If successful, the
session is initialized. Returns the created DryRun.
* *GET* */processors/\{processorId}/dry-run/\{dryRunId}*
* *Usage:* Retrieves the current DryRun session.
* *DELETE* */processors/\{processorId}/dry-run*
* *Usage:* Cleans up the Dry Run session and releases resources.
h3. *Data Injection*
* *POST* */processors/\{processorId}/dry-run/flowfiles*
* *Usage:* Uploads the content and attributes for a test FlowFile.
h3. *Execution Control & Results*
* *PUT* */processors/\{processorId}/dry-run/state*
* *Usage:* Transitions the DryRun to begin or cancel.
* *GET* */processors/\{processorId}/dry-run/result*
* *Usage:* Retrieves the output of the dry run as a DryRunResult
* *GET*
*/processors/\{processorId}/dry-run/result/flowfiles/\{flowFileId}/content*
* *Usage:* Retrieves the contents of the output FlowFile with the given ID
h2. *Implementation*
h3. *1. State Management Isolation*
A critical requirement for this implementation is the handling of state. In
this context, "state" specifically refers to the {*}Processor-managed state
provided by the State Manager{*}.
* *Read Behavior:* The Dry Run must be able to read the _actual_ current state
from the State Manager to ensure the test is realistic.
* *Write Behavior:* The Dry Run must *never* update the real state. The
implementation must ensure that the State Manager used during the dry run
intercepts updates and stores them in a temporary, isolated context, discarding
them once the dry run is complete or deleted.
h3. *2. Repositories*
To ensure isolation from the production flow, the implementation will utilize
dedicated, ephemeral repositories:
* *DryRun FlowFile Repo:* Stores the output FlowFiles so that they can be
returned as part of the Dry Run.
* *Provenance Repo:* Tracks Provenance events specific to the dry run for
debugging and verification.
The standard Content Repository can still be used, because the FlowFiles will
not be queued up for processing, and the content can be simply discarded and
ignored; this avoids us needing to buffer content in memory or create a new,
complex Content Repository.
h3. *3. ProcessContext*
A new method will be added to ProcessContext:
{{boolean isDryRun()}}
This will return true if the Processor is being triggered as part of a Dry Run
session, false otherwise. This might be used, for instance, to avoid
acknowledging to the a data provided such as Apache Kafka that data has been
consumed / processed.
h3. *4. Data Models*
The backend implementation will rely on the following Objects:
*DryRun*
* String getProcessorId()
* String getDryRunId()
* DryRunStatus getStatus()
* ScheduledState getInitialState()
*DryRunFlowFileMetadata*
* String getUuid()
* Map<String, String> getAttributes()
*DryRunFlowFile extends DryRunFlowFileMetadata*
* InputStream getContents()
*DryRunState (Enum)*
* NEW
* STARTING
* RUNNING
* STOPPING
* COMPLETED
* CANCELLED
* FAILED
*DryRunStatus*
* int getInputCount()
* long getInputBytes()
* DryRunState getState()
*DryRunResult*
* List<DryRunRelationshipResult> getOutputs()
* List<ProvenanceEventEntity> getProvenanceEvents()
* Map<String, String> getLocalState()
* Map<String, String> getClusterState()
*DryRunRelationshipResult*
* String getRelationshipName()
* List<DryRunFlowFileMetadata> getFlowFileMetadata()
--
This message was sent by Atlassian Jira
(v8.20.10#820010)