[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lukas Kucharski updated NIFI-14424:
-----------------------------------
Attachment: image-2025-07-17-14-51-24-943.png
> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Attachments: image-2025-07-16-12-52-20-296.png,
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png,
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png,
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> h1. Introduction
> This change enables NIFI to deserialize protobuf messages encoded using the
> Confluent [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> Confluent kafka protobuf serde components use a custom header for the
> message payload. The header needs to be parsed and interpreted. The existing
> NIFI Confluent integration can decode Confluent type headers only for the
> payload containing Avro data. For protobuf data, an additional
> variable-length byte section is being added to the header by the Kafka
> protobuf serializer.
>
> In addition, a new implementation for *ProtobufReader, ProtobufReader2
> (working name)* will be added. The existing ProtobufReader component can't be
> easily extended without breaking backward compatibility because two component
> properties *Proto Directory* and *Message Type* are always required. So, even
> when the component is configured to use a schema registry and schema
> reference reader, the user needs to provide those two parameters. It looks
> like the component was never intended to be used with any other *Schema
> Access Strategy* than {*}Generate From Proto File{*}. Additionally, the
> current implementation can't work in a cluster without copying protobuf
> schemas manually onto every node.
> *ProtobufReader2(new)* will not support the *Generate From Proto File* schema
> access strategy. It will support the remaining ones from the list below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
> by [~exceptionfactory], we decided that it's reasonable to create a new
> implementation of ProtobufReader instead of maintaining the old one. In
> addition, the PR will be divided into two smaller ones. First, will cover
> changes to the confluent-bundle and common apis, and the other will cover the
> new ProtobufReader implementation
> With this context, the usecase below
> h2. TODO: explain nomenclature
> h2.
> The use case
> # The user has a schema registry and a Kafka cluster with the topic
> containing messages encoded according to the Confluent [protobuf wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> (Magic byte + 4byte schema_id + varint message indexes + payload)
> # The user configures *ProtobufReader2 (working name)* service
> {color:#ff0000}({color}{*}NEW - not in PR{*}{color:#172b4d}){color}
> ## User sets Schema Access Strategy property to *Schema Reference Reader
> (existing)*
> ### *ConfluentSchemaRegistry* was extended to support the fetching of
> referenced/included schemas. If a protobuf schema imports other schemas, they
> are fetched from the registry recursively. *(new)*
> ##
> ### *SchemaRegistry (existing)* interface was extended to support returning
> a raw schema form *({color:#ff0000}NEW - in PR{color})* in addition to the
> current RecordSchema. The first design proposed by [~awelless] called for
> introducing a {-}*SchemaDefinitionProvider*{-}, but during the
> implementation, it seemed that fetching and returning raw schema definition
> better suited the existing abstraction of SchemaRegistry, and no new
> abstraction was introduced to the API. With this change, a new abstraction
> was introduced to nifi-schema-registry-service-api - *SchemaDefinition.* The
> intention of this is to represent a raw form of various schemas.
> ## Sets Schema Registry property *(existing ConfluentSchemaRegistry)*
> ## Sets Schema Reference reader ({*}existing
> ConfluentEncodedSchemaReferenceReader){*}
> ## Sets *Message Name Resolver Strategy* property {*}(NEW){*}. Two options
> are available. One - *Message Type Property* *({color:#ff0000}NEW{color}),*
> Two - *Message Name Resolver Service ({color:#ff0000}NEW{color})*
> ### Option one exists for simple cases where the user knows the name of the
> protobuf message upfront, and the message will not change
> ### Option two allows the user to choose the controller service responsible
> for resolving the message name. In this case, the controller service looks at
> the header of the message payload to get the information needed to locate the
> Message within the protobuf schema (the information is a message index array,
> see protobuf wire format). This is done by
> *ConfluentProtobufMessageNameResolver(new)* service.
> *Explanation:*
> Why do we need message name resolving? Protobuf schemas can contain many
> message definitions at the root level (see example below). We need to
> explicitly give the message name to the deserializer for it to be able to do
> its job. A Schema file or schema text alone may not be enough because it may
> be ambiguous.
> The ProtobufReader always needs the name of the proto schema message to be
> able to decode the binary payload.
> The new strategy defines a new interface *MessageNameResolver*
> *({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently,
> one implementation exists: *ConfluentProtobufMessageNameResolver*
> *({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
> The ConfluentProtobufMessageNameResolver looks at the second byte in the
> input stream to decode Confluent standardized *message name indexes.* More
> about it here: [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. More details
> in confluent wire format document. Indexes are zigzag encoded varints.
> *({color:#ff0000}new{color})*
> *Parsing of the protobuf message schemas.*
> After *ConfluentProtobufMessageNameResolver* decodes the message indexes it
> looks for the protobuf message name in the *SchemaDefinition
> ({color:#ff0000}new{color}).* Messages can be nested, so the message index
> decoded from the content can point to the nested message like
> `MessageA.MessageE.MessageG`
> {{ !image-2025-07-16-12-52-20-296.png! }}
> ### The *ConfluentMessageNameResolver* need to parse the schema in order to
> know the structure of the schema and locate the right message name
> *({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3
> grammar was downloaded from the antlr repository. Then the maven plugin
> generates the Parser for proto3 schema format.
> *Why not use the wire library to parse it?*
> In order to get the schema message, we need to parse the schema, not compile
> it. The difference is that we can parse an incomplete schema as opposed to
> compilation, where we would need a schema + imported schemas. Confluent in
> the implementation of its schema-registry uses a parser from an internal wire
> package:
> com.squareup.wire.schema.internal.parser.*
> The decision was made not to use internal classes from 3rd party library. I
> think they are not public anymore in the newer versions. Additionally, I did
> not want to add a wire library dependency and its dependencies to the
> confluent-bundle because I remembered the maintainers favor keeping the
> memory footprint small. ANTLr parser uses a very small standalone
> antl-runtime.jar
> # The user configures *ConsumeKafka* processor with the processing strategy
> set to *RECORD (existing)*
>
>
> Currently for Protobuf record parsing there is a
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
> which supports proto files on a [local filesystem
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>
> The change aims on:
> * supporting schema registries to retrieve proto definitions.
> * support for decoding protobuf [wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
> collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to
> translate an encoded Protobuf message index into a Protobuf Message name
> that’s later going to be used to find a correct Protobuf message for
> deserialization.
> The interface will be put into the _nifi-schema-registry-service-api_ module.
> Schema Registry dependent implementations will land into their respective
> modules. E.g. _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
>
> The existing *ProtobufReader* implementation has two properties ( Proto
> Directory, Message Type) that are always required. To keep the backward
> compatibility, we will make those two properties optional and add a custom
> validator to require them when 'Generate From Protofile' schema access
> strategy is selected.
> Attribute modifications:
>
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Message Name Resolver|Service implementation responsible for reading
> FlowFile attributes or content to determine the name of a Protobuf message to
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new
> property, available only when _Schema Access Strategy_ is
> _SCHEMA_REFERENCE_READER_|
> |Proto Directory, Message Type|Currently required. Will be marked as optional
> and custom validation will require them when the *Schema Access Strategy* is
> set to *Generate From Proto file*| | |
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)