Mark Payne created NIP-15:
-----------------------------

             Summary: Dry Run Capability
                 Key: NIP-15
                 URL: https://issues.apache.org/jira/browse/NIP-15
             Project: NiFi Improvement Proposal
          Issue Type: New Feature
            Reporter: Mark Payne


h2. *Background & Motivation*

Currently, users lack a safe, easy-to-use, isolated mechanism to verify 
processor behavior with its given configuration. The goal of this feature is to 
allow users to perform a dry-run of a Processor, providing it input to operate 
on (or potentially no input in the case of source Processors) and receive the 
results.

Often testing requires running the flow up to the point of the Processor in 
question, running it once, clicking to view content, potentially tweaking the 
input, doing it again, and iterating several times. This works but in some 
cases can be burdensome, especially when there is data enqueued upstream of the 
Processor already. Additionally, it can be problematic for a Processor such as 
ConsumeKafka where one might want to just get some sample data but doing so 
would commit offsets, resulting in either having to reset offsets or even 
potentially lose data in the production system.

 
h2. *Dry Run Validity*

Dry Runs will not be valid for all Processors, as it may cause undesirable side 
effects. A great example of this is the ConsumeKafka Processor noted above. 
Without any modifications, if it were to be triggered to run as a Dry Run, it 
would still acknowledge its consumption of data to Apache Kafka, and this could 
result in data loss.

To prevent such a scenario, a Processor can have a Dry Run session established 
in the following scenarios:
 * Processor is annotated with @SupportsDryRun - this is a new Annotation and 
denotes explicitly that the Processor supports Dry Runs.
 * Processor is annotated with @SideEffectFree - any Processor with this 
annotation is already stating that it has no side effects outside of the NiFi, 
and the framework will provide isolation within NiFi in the case of a Dry Run.

We will allow establishing a Dry Run session when a Processor’s state is either 
STOPPED or RUNNING. In the event that the Processor is RUNNING, a Dry Run 
session will be allowed only if it is not annotated with @TriggerSerially. In 
the event that the Processor is STOPPED, a Dry Run session will be allowed only 
if the Processor is valid. We will not allow a Dry Run session to be 
established while the Processor is in an intermediate state (STARTING, 
STOPPING) or when the Processor is DISABLED.
h2. *Lifecycle*

Once a Dry Run session has been created, it will remain RUNNING until one of 
the following happens:
 * It is cancelled  → Transitions to STOPPING → CANCELLED
 * An Exception is thrown from the Processor → Transitions to STOPPING → FAILED
 * If there were any input FlowFiles supplied, they have all been processed → 
Transitions to STOPPING → COMPLETED
 * If there were no input FlowFiles supplied, at least one FlowFile is output → 
Transitions to STOPPING → COMPLETED

When a Dry Run session is created, it will have state NEW. This allows 
FlowFiles to be uploaded, etc. in order in prepare it to run. The state is then 
transitioned to RUNNING (which will result in a temporary state of STARTING as 
@OnScheduled methods, etc. are called). Once completed, a transient state of 
STOPPING will be used while @OnStopped methods, etc. are called. Finally, the 
state will transition to one of COMPLETED, CANCELLED, or FAILED depending on 
the outcome.

 
h2. *Scope*

The scope of this feature centers on a new set of REST endpoints designed to 
manage the lifecycle of a "Dry Run." These APIs allow for the creation of a 
test session, the injection of test data, and the retrieval of results.

*Base Resource:* /processors/\{id}/dry-run
h3. *Core Lifecycle*
 * *POST* */processors/\{processorId}/dry-run*
 * *Usage:* Creates a new Dry Run session.
 * *Behavior:* Fails immediately if the request is invalid. If successful, the 
session is initialized. Returns the created DryRun.


 * *GET* */processors/\{processorId}/dry-run/\{dryRunId}*
 * *Usage:* Retrieves the current DryRun session.


 * *DELETE* */processors/\{processorId}/dry-run*
 * *Usage:* Cleans up the Dry Run session and releases resources.

h3. *Data Injection*
 * *POST* */processors/\{processorId}/dry-run/flowfiles*
 * *Usage:* Uploads the content and attributes for a test FlowFile.

h3. *Execution Control & Results*
 * *PUT* */processors/\{processorId}/dry-run/state*
 * *Usage:* Transitions the DryRun to begin or cancel.


 * *GET* */processors/\{processorId}/dry-run/result*
 * *Usage:* Retrieves the output of the dry run as a DryRunResult


 * *GET* 
*/processors/\{processorId}/dry-run/result/flowfiles/\{flowFileId}/content*

 * *Usage:* Retrieves the contents of the output FlowFile with the given ID

 
h2. *Implementation*
h3. *1. State Management Isolation*

A critical requirement for this implementation is the handling of state. In 
this context, "state" specifically refers to the {*}Processor-managed state 
provided by the State Manager{*}.
 * *Read Behavior:* The Dry Run must be able to read the _actual_ current state 
from the State Manager to ensure the test is realistic.
 * *Write Behavior:* The Dry Run must *never* update the real state. The 
implementation must ensure that the State Manager used during the dry run 
intercepts updates and stores them in a temporary, isolated context, discarding 
them once the dry run is complete or deleted.

h3. *2. Repositories*

To ensure isolation from the production flow, the implementation will utilize 
dedicated, ephemeral repositories:
 * *DryRun FlowFile Repo:* Stores the output FlowFiles so that they can be 
returned as part of the Dry Run.
 * *Provenance Repo:* Tracks Provenance events specific to the dry run for 
debugging and verification.

The standard Content Repository can still be used, because the FlowFiles will 
not be queued up for processing, and the content can be simply discarded and 
ignored; this avoids us needing to buffer content in memory or create a new, 
complex Content Repository.
h3. *3. ProcessContext*

A new method will be added to ProcessContext:

{{boolean isDryRun()}}

This will return true if the Processor is being triggered as part of a Dry Run 
session, false otherwise. This might be used, for instance, to avoid 
acknowledging to the a data provided such as Apache Kafka that data has been 
consumed / processed.

 
h3. *4. Data Models*

The backend implementation will rely on the following Objects:

*DryRun*
 * String getProcessorId()
 * String getDryRunId()
 * DryRunStatus getStatus()
 * ScheduledState getInitialState()

*DryRunFlowFileMetadata*
 * String getUuid()
 * Map<String, String> getAttributes()

*DryRunFlowFile extends DryRunFlowFileMetadata*
 * InputStream getContents()

*DryRunState (Enum)*
 * NEW
 * STARTING
 * RUNNING
 * STOPPING
 * COMPLETED
 * CANCELLED
 * FAILED

*DryRunStatus*
 * int getInputCount()
 * long getInputBytes()
 * DryRunState getState()

*DryRunResult*
 * List<DryRunRelationshipResult> getOutputs()
 * List<ProvenanceEventEntity> getProvenanceEvents()
 * Map<String, String> getLocalState()
 * Map<String, String> getClusterState()

*DryRunRelationshipResult*
 * String getRelationshipName()
 * List<DryRunFlowFileMetadata> getFlowFileMetadata()

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to