[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lukas Kucharski updated NIFI-14424:
-----------------------------------
Attachment: image-2025-07-16-12-52-20-296.png
> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Attachments: image-2025-07-16-12-52-20-296.png
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> This change enables NIFI to deserialize protobuf messages encoded using the
> Confluent [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The use case:
> # The user has a schema registry and kafka cluster with the topics
> containing messages encoded according to confluent protobuf wire format
> (Magic byte + varint message indexes + payload)
> # The user configures *ProtobufReader* service ({*}existing{*})
> ## User sets Schema Access Strategy property to *Schema Reference Reader
> (existing)*
> ## Sets Schema Registry property *(existing)*
> ## Sets Schema Reference reader ({*}existing){*}
> ## Sets *Message Name Resolver Strategy.* Two options are available. One -
> *Message Type Property* *(NEW),* Two - *Message Name Resolver Service (NEW)*
> ### Option one exists for backward compatibility and allows users to use
> ProtobufReader with Proto Directory and Message Type properties
> ### Option two allows the user to choose the controller service that will be
> responsible for resolving the message name for decoding.
> *Explanation:*
> The ProtobufReader always needs the name of the proto schema message to be
> able to decode the binary payload. In its current form, the Message name is
> set via the property *Message Type (existing)*
> The new strategy defines the new interface *MessageNameResolver* *(new)* in
> nifi-schema-registry-service-api. Currently, one implementation exists:
> *ConfluentProtobufMessageNameResolver* *(new)*
> The ConfluentProtobufMessageNameResolver looks at the second byte in the
> input stream to decode Confluent standardized *message name indexes.* More
> about it here: [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. More details
> in confluent wire format document.
>
> # The user configures *ConsumeKafka* processor with the processing strategy
> set to *RECORD (existing)*
>
>
> Currently for Protobuf record parsing there is a
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
> which supports proto files on a [local filesystem
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>
> The change aims on:
> * supporting schema registries to retrieve proto definitions.
> * support for decoding protobuf [wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
> collection of referenced {_}SchemaDefinitions{_}.
> h2. -SchemaDefinitionProvider-
> -New _SchemaDefinitionProviders_ provide a simple interface for retrieving
> Schemas as text, together with their references. Unlike their
> _SchemaRegistry_ counterparts, no parsing is going to take place on that
> level, and it’s up to the caller what to do with returned texts.-
> -The interface will be put into the _nifi-schema-registry-service-api_
> module. Whereas the implementations should be created in their respective
> modules, like _nifi-confluent-schema-registry-service_ for Confluent.-
> *Notes after the initial implementation:*
> *SchemaDefinitionProvider* was added with an initial implementation (not
> merged), but was later removed in favour of having its responsibility added
> to the SchemaRegistry implementation. The SchemaRegistry interface was
> extended with one method with a default implementation, and there was no need
> to introduce new abstractions ( SchemaDefinitionProvider). This allows us to
> keep the UX consistent with patterns established so far. Additionally, having
> the ability to fetch a raw schema from SchemaRegistry might prove helpful
> when additional context, which can be supplied by the original schema (not
> RecordSchema), is needed.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to
> translate an encoded Protobuf message index into a Protobuf Message name
> that’s later going to be used to find a correct Protobuf message for
> deserialization.
> -Similar to the _SchemaDefinitionProvider,_- the interface will be put into
> the _nifi-schema-registry-service-api_ module. Schema Registry dependent
> implementations will land into their respective modules. E.g.
> _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
> -Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
> conform to that interface and add a new {_}Schema Access Strategy -
> SCHEMA_DEFINITION_PROVIDER{_}.-
> -The _createRecordReader_ method will be split in 2 branches. One for
> _SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).-
> -The _getSupportedPropertyDescriptors_ will be modified to define the new
> properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
> _SCHEMA_DEFINITION_PROVIDER_ value.-
> The existing *ProtobufReader* implementation has two properties ( Proto
> Directory, Message Type) that are always required. To keep the backward
> compatibility, we will make those two properties optional and add a custom
> validator to require them when 'Generate From Protofile' schema access
> strategy is selected.
> Attribute modifications:
>
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |-Schema Reference Reader- ({*}Note: this property already exists, no
> modifications needed){*}|-Service implementation responsible for reading
> FlowFile attributes or content to determine the Schema Reference
> Identifier.-|-<{_}SchemaReferenceReader{_}> impl-|-That’s going to be the
> existing property in SchemaRegistryService.-|
> |-Schema Definition Provider-|-A provider that will retrieve Protobuf schema
> definitions.-|-<{_}SchemaDefinitionProvider{_}> impl-|-This is a new
> property, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._-|
> |Message Name Resolver|Service implementation responsible for reading
> FlowFile attributes or content to determine the name of a Protobuf message to
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new
> property, available only when _Schema Access Strategy_ is
> _{-}SCHEMA_DEFINITION_PROVIDER{-}. SCHEMA_REFERENCE_READER_|
> |-Proto Directory, Message Type-| | |-*These properties will be hidden when*
> *_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*-
> -*Currently they’re mandatory.*- |
> |_Schema Caching parameters, like size and expiration_| | |These are new
> properties, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._|
> h3. -Custom Protobuf Loader-
> -The wire’s
> [SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
> can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to
> a file system. Instead of implementing an adapter for a file system, a custom
> glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
> for loading, or accepts an already fetched SchemaDefinition. See
> [CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
> for inspiration.-
> -This logic will reside in _nifi-protobuf-services_ too.-
> h2. Appendix: Code examples
> h3. -SchemaDefinitionProvider-
> -{{record SchemaDefinition(}}-
> -{{SchemaIdentifier id,}}-
> -{{String text,}}-
> -{{Map<String, SchemaDefinition> references // key - by what name the
> schema is referenced from its parent}}-
> -{{)}}-
> -{{interface SchemaDefinitionProvider {}}-
> -{{SchemaDefinition getById(SchemaIdentifier id);}}-
> -{{}}}-
> -{{class ConfluentSchemaDefinitionProvider implements
> SchemaDefinitionProvider {}}-
> -{{// ...}}-
> -{{}}}-
> h3. MessageNameResolver
> {{record Message(}}
> {{ String name;}}
> {{ List<Message> nestedMessages;}}
> {{)}}
> {{interface MessageNameResolver {}}
> {{ String getMessageName(InputStream contentStream, List<Message>
> messages);}}
> }
--
This message was sent by Atlassian Jira
(v8.20.10#820010)