[ 
https://issues.apache.org/jira/browse/NIP-15?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18043669#comment-18043669
 ] 

Mark Payne commented on NIP-15:
-------------------------------

Agreed [~exceptionfactory]. Would probably put a cap on max upload size in 
nifi.properties with a default of perhaps 1 MB. My intent would be that it 
would be automatically deleted when the dry-run session is completed. Because 
it would be stored in the Content Repo, it would also be automatically removed 
on restart. It might also be a good idea to timeout a dry-run session after 
some time, like a minute, and have that also remove any uploaded data.

> Dry Run Capability
> ------------------
>
>                 Key: NIP-15
>                 URL: https://issues.apache.org/jira/browse/NIP-15
>             Project: NiFi Improvement Proposal
>          Issue Type: New Feature
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> h2. *Background & Motivation*
> Currently, users lack a safe, easy-to-use, isolated mechanism to verify 
> processor behavior with its given configuration. The goal of this feature is 
> to allow users to perform a dry-run of a Processor, providing it input to 
> operate on (or potentially no input in the case of source Processors) and 
> receive the results.
> Often testing requires running the flow up to the point of the Processor in 
> question, running it once, clicking to view content, potentially tweaking the 
> input, doing it again, and iterating several times. This works but in some 
> cases can be burdensome, especially when there is data enqueued upstream of 
> the Processor already. Additionally, it can be problematic for a Processor 
> such as ConsumeKafka where one might want to just get some sample data but 
> doing so would commit offsets, resulting in either having to reset offsets or 
> even potentially lose data in the production system.
>  
> h2. *Dry Run Validity*
> Dry Runs will not be valid for all Processors, as it may cause undesirable 
> side effects. A great example of this is the ConsumeKafka Processor noted 
> above. Without any modifications, if it were to be triggered to run as a Dry 
> Run, it would still acknowledge its consumption of data to Apache Kafka, and 
> this could result in data loss.
> To prevent such a scenario, a Processor can have a Dry Run session 
> established in the following scenarios:
>  * Processor is annotated with @SupportsDryRun - this is a new Annotation and 
> denotes explicitly that the Processor supports Dry Runs.
>  * Processor is annotated with @SideEffectFree - any Processor with this 
> annotation is already stating that it has no side effects outside of the 
> NiFi, and the framework will provide isolation within NiFi in the case of a 
> Dry Run.
> We will allow establishing a Dry Run session when a Processor’s state is 
> either STOPPED or RUNNING. In the event that the Processor is RUNNING, a Dry 
> Run session will be allowed only if it is not annotated with 
> @TriggerSerially. In the event that the Processor is STOPPED, a Dry Run 
> session will be allowed only if the Processor is valid. We will not allow a 
> Dry Run session to be established while the Processor is in an intermediate 
> state (STARTING, STOPPING) or when the Processor is DISABLED.
> h2. *Lifecycle*
> Once a Dry Run session has been created, it will remain RUNNING until one of 
> the following happens:
>  * It is cancelled  → Transitions to STOPPING → CANCELLED
>  * An Exception is thrown from the Processor → Transitions to STOPPING → 
> FAILED
>  * If there were any input FlowFiles supplied, they have all been processed → 
> Transitions to STOPPING → COMPLETED
>  * If there were no input FlowFiles supplied, at least one FlowFile is output 
> → Transitions to STOPPING → COMPLETED
> When a Dry Run session is created, it will have state NEW. This allows 
> FlowFiles to be uploaded, etc. in order in prepare it to run. The state is 
> then transitioned to RUNNING (which will result in a temporary state of 
> STARTING as @OnScheduled methods, etc. are called). Once completed, a 
> transient state of STOPPING will be used while @OnStopped methods, etc. are 
> called. Finally, the state will transition to one of COMPLETED, CANCELLED, or 
> FAILED depending on the outcome.
>  
> h2. *Scope*
> The scope of this feature centers on a new set of REST endpoints designed to 
> manage the lifecycle of a "Dry Run." These APIs allow for the creation of a 
> test session, the injection of test data, and the retrieval of results.
> *Base Resource:* /processors/\{id}/dry-run
> h3. *Core Lifecycle*
>  * *POST* */processors/\{processorId}/dry-run*
>  * *Usage:* Creates a new Dry Run session.
>  * *Behavior:* Fails immediately if the request is invalid. If successful, 
> the session is initialized. Returns the created DryRun.
>  * *GET* */processors/\{processorId}/dry-run/\{dryRunId}*
>  * *Usage:* Retrieves the current DryRun session.
>  * *DELETE* */processors/\{processorId}/dry-run*
>  * *Usage:* Cleans up the Dry Run session and releases resources.
> h3. *Data Injection*
>  * *POST* */processors/\{processorId}/dry-run/flowfiles*
>  * *Usage:* Uploads the content and attributes for a test FlowFile.
> h3. *Execution Control & Results*
>  * *PUT* */processors/\{processorId}/dry-run/state*
>  * *Usage:* Transitions the DryRun to begin or cancel.
>  * *GET* */processors/\{processorId}/dry-run/result*
>  * *Usage:* Retrieves the output of the dry run as a DryRunResult
>  * *GET* 
> */processors/\{processorId}/dry-run/result/flowfiles/\{flowFileId}/content*
>  * *Usage:* Retrieves the contents of the output FlowFile with the given ID
>  
> h2. *Implementation*
> h3. *1. State Management Isolation*
> A critical requirement for this implementation is the handling of state. In 
> this context, "state" specifically refers to the {*}Processor-managed state 
> provided by the State Manager{*}.
>  * *Read Behavior:* The Dry Run must be able to read the _actual_ current 
> state from the State Manager to ensure the test is realistic.
>  * *Write Behavior:* The Dry Run must *never* update the real state. The 
> implementation must ensure that the State Manager used during the dry run 
> intercepts updates and stores them in a temporary, isolated context, 
> discarding them once the dry run is complete or deleted.
> h3. *2. Repositories*
> To ensure isolation from the production flow, the implementation will utilize 
> dedicated, ephemeral repositories:
>  * *DryRun FlowFile Repo:* Stores the output FlowFiles so that they can be 
> returned as part of the Dry Run.
>  * *Provenance Repo:* Tracks Provenance events specific to the dry run for 
> debugging and verification.
> The standard Content Repository can still be used, because the FlowFiles will 
> not be queued up for processing, and the content can be simply discarded and 
> ignored; this avoids us needing to buffer content in memory or create a new, 
> complex Content Repository.
> h3. *3. ProcessContext*
> A new method will be added to ProcessContext:
> {{boolean isDryRun()}}
> This will return true if the Processor is being triggered as part of a Dry 
> Run session, false otherwise. This might be used, for instance, to avoid 
> acknowledging to the a data provided such as Apache Kafka that data has been 
> consumed / processed.
>  
> h3. *4. Data Models*
> The backend implementation will rely on the following Objects:
> *DryRun*
>  * String getProcessorId()
>  * String getDryRunId()
>  * DryRunStatus getStatus()
>  * ScheduledState getInitialState()
> *DryRunFlowFileMetadata*
>  * String getUuid()
>  * Map<String, String> getAttributes()
> *DryRunFlowFile extends DryRunFlowFileMetadata*
>  * InputStream getContents()
> *DryRunState (Enum)*
>  * NEW
>  * STARTING
>  * RUNNING
>  * STOPPING
>  * COMPLETED
>  * CANCELLED
>  * FAILED
> *DryRunStatus*
>  * int getInputCount()
>  * long getInputBytes()
>  * DryRunState getState()
> *DryRunResult*
>  * List<DryRunRelationshipResult> getOutputs()
>  * List<ProvenanceEventEntity> getProvenanceEvents()
>  * Map<String, String> getLocalState()
>  * Map<String, String> getClusterState()
> *DryRunRelationshipResult*
>  * String getRelationshipName()
>  * List<DryRunFlowFileMetadata> getFlowFileMetadata()
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to