Mark Payne created NIP-31:
-----------------------------
Summary: Processor / Connector Backlog Reporting
Key: NIP-31
URL: https://issues.apache.org/jira/browse/NIP-31
Project: NiFi Improvement Proposal
Issue Type: New Feature
Reporter: Mark Payne
h2. Motivation
NiFi shows operators how much data is queued _inside_ a flow, but has no way to
report how much data is still waiting on the _source_ system that a flow pulls
from and has not yet ingested. An operator can see that a ConsumeKafka processor
has 0 FlowFiles queued, but cannot tell from NiFi whether the topic it consumes
has 10 unconsumed messages or 10 million.
This "source backlog" is the signal operators need to answer "is this flow
caught
up?" and "is it keeping pace with its source?" Today they must use the external
system's own tooling to find out. This proposal adds a first-class way for a
source Processor (or a Connector built around one) to report its source backlog,
and surfaces that information through the REST API and the UI.
h2. Proposed Solution
Introduce an opt-in capability that a source component implements to report its
backlog:
* A new {{Backlog}} value type describing what remains on the source: number of
FlowFiles, bytes, and records; the time the component was last fully caught up;
and a precision indicating whether those numbers are exact or a lower bound.
* A {{BacklogReportingProcessor}} interface that a Processor implements to
report
a {{{}Backlog{}}}. A Processor opts in by implementing the interface;
Processors that
do not implement it are never asked.
* A {{BacklogReportingConnector}} interface that does the same for Connectors.
A
Connector typically derives its {{Backlog}} by delegating to a source Processor
in its managed flow, but may also inspect queued data.
The framework exposes the reported {{Backlog}} through new REST endpoints and a
UI dialog. In a cluster, each node reports its own view and the responses are
aggregated into a single cluster-wide {{{}Backlog{}}}.
Computing a backlog frequently requires the component to make a request to the
external source system, so this is treated as a {*}WRITE{*}-authorized
operation even
though it is conceptually a read.
A reference set of source components will implement the capability as part of
this
work: ConsumeKafka, ListS3, and ConsumeKinesis, plus the KafkaToS3 Connector
(which delegates to its ConsumeKafka).
h2. NiFi API Changes
New, additive types (no existing API is removed or changed in a breaking way):
* {{org.apache.nifi.components.Backlog}} — the value type, with a {{Precision}}
enum ({{{}EXACT{}}}, {{{}AT_LEAST{}}}) and factory methods for the common
"caught up" cases.
* {{org.apache.nifi.components.BacklogReportingException}} — thrown by a
component
that attempted to determine its backlog and failed (for example, the source is
unreachable).
* {{org.apache.nifi.processor.BacklogReportingProcessor}} — opt-in capability
interface; {{{}getBacklog(ProcessContext){}}}.
* {{org.apache.nifi.components.connector.BacklogReportingConnector}} — opt-in
capability interface; {{{}getBacklog(FlowContext){}}}.
* {{org.apache.nifi.components.connector.components.QueueSnapshot}} — an
atomic,
point-in-time view of a connection's queue, used by Connectors that compute
backlog from queued data.
Additive methods are also added to the framework-supplied facade interfaces
{{ProcessorFacade}} ({{{}reportsBacklog(){}}}, {{{}getBacklog(){}}}) and
{{ConnectionFacade}} ({{{}getQueueSnapshot(){}}}). These facades are
implemented only
by the framework, not by extensions, so adding methods does not break existing
extensions.
h2. REST API Changes
* GET /processors/ \{id}/backlog — returns the Processor's {{{}Backlog{}}}.
* A new {{{}BacklogEntity{}}}/{{{}BacklogDTO{}}} carrying both raw numeric
values and
server-formatted, human-readable strings.
Both require *WRITE* authorization. A component that does not implement the
capability returns {{{}409 Conflict{}}}; a {{BacklogReportingException}} is
mapped to
an error response that is distinguishable from "not supported."
h2. Framework Changes
* {{FlowFileQueue}} gains an atomic queue-snapshot operation so that a
connection's size and its FlowFile list are captured together at a single point
in time (required for Connectors that compute backlog from queued FlowFiles).
* Cluster aggregation merges per-node {{Backlog}} responses: numeric dimensions
are summed, the "last caught up" time is the earliest across nodes, and
precision
is downgraded to {{AT_LEAST}} if any node is inexact.
* Human-readable formatting (counts, data sizes, and local-time "last caught
up")
is computed server-side so that the UI and any other client render consistently.
h2. UI Changes
* A context-menu entry on eligible Processors and Connectors fetches the
current
{{{}Backlog{}}}.
* The {{Backlog}} is shown in a dialog laid out as a vertical two-column table
(measurement / value), with pretty-printed numbers and the raw value on hover,
local-time "last caught up" with timezone, and "No value set" for unreported
dimensions.
h2. Scope / Not in Scope
In scope: the API capability, REST endpoints, cluster aggregation, UI
presentation, and reference implementations on ConsumeKafka, ListS3,
ConsumeKinesis, and the KafkaToS3 Connector.
Not in scope: implementing the capability on every source Processor. Components
opt in over time; a component that does not implement the interface behaves
exactly as it does today.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)