[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lukas Kucharski updated NIFI-14424:
-----------------------------------
    Attachment: image-2025-07-17-14-52-18-792.png

> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>         Attachments: image-2025-07-16-12-52-20-296.png, 
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png, 
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png, 
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png, 
> image-2025-07-17-14-52-18-792.png
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> h1. Introduction
> This change enables NIFI to deserialize protobuf messages encoded using the 
> Confluent [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  Confluent kafka protobuf serde components use a custom header for the 
> message payload. The header needs to be parsed and interpreted. The existing 
> NIFI Confluent integration can decode Confluent type headers only for the 
> payload containing Avro data. For protobuf data, an additional 
> variable-length byte section is being added to the header by the Kafka 
> protobuf serializer. 
>  
> In addition, a new implementation for *ProtobufReader, ProtobufReader2 
> (working name)* will be added. The existing ProtobufReader component can't be 
> easily extended without breaking backward compatibility because two component 
> properties *Proto Directory* and *Message Type* are always required. So, even 
> when the component is configured to use a schema registry and schema 
> reference reader, the user needs to provide those two parameters. It looks 
> like the component was never intended to be used with any other *Schema 
> Access Strategy* than {*}Generate From Proto File{*}.  Additionally, the 
> current implementation can't work in a cluster without copying protobuf 
> schemas manually onto every node.  
> *ProtobufReader2(new)* will not support the *Generate From Proto File* schema 
> access strategy. It will support the remaining ones from the list below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this 
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look 
> by [~exceptionfactory], we decided that it's reasonable to create a new 
> implementation of ProtobufReader instead of maintaining the old one. In 
> addition, the PR will be divided into two smaller ones. First, will cover 
> changes to the confluent-bundle and common apis, and the other will cover the 
> new ProtobufReader implementation
> With this context, the usecase below
> h2. TODO: explain nomenclature
> h2.  
> The use case
>  # The user has a schema registry and a Kafka cluster with the topic 
> containing messages encoded according to the Confluent [protobuf wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  (Magic byte + 4byte schema_id + varint message indexes + payload)
>  # The user configures *ProtobufReader2 (working name)* service 
> {color:#ff0000}({color}{*}NEW - not in PR{*}{color:#172b4d}){color}
> ## User sets Schema Access Strategy property to *Schema Reference Reader 
> (existing)*
> ### *ConfluentSchemaRegistry* was extended to support the fetching of 
> referenced/included schemas. If a protobuf schema imports other schemas, they 
> are fetched from the registry recursively. *(new)*
>  ## 
>  ### *SchemaRegistry (existing)* interface was extended to support returning 
> a raw schema form *({color:#ff0000}NEW - in PR{color})* in addition to the 
> current RecordSchema. The first design proposed by [~awelless] called for 
> introducing a {-}*SchemaDefinitionProvider*{-}, but during the 
> implementation, it seemed that fetching and returning raw schema definition 
> better suited the existing abstraction of SchemaRegistry, and no new 
> abstraction was introduced to the API. With this change, a new abstraction 
> was introduced to nifi-schema-registry-service-api - *SchemaDefinition.* The 
> intention of this is to represent a raw form of various schemas.
>  ## Sets Schema Registry property *(existing ConfluentSchemaRegistry)* 
>  ## Sets Schema Reference reader ({*}existing 
> ConfluentEncodedSchemaReferenceReader){*}
>  ## Sets *Message Name Resolver Strategy* property {*}(NEW){*}. Two options 
> are available. One - *Message Type Property* *({color:#ff0000}NEW{color}),* 
> Two - *Message Name Resolver Service ({color:#ff0000}NEW{color})*
>  ### Option one exists for simple cases where the user knows the name of the 
> protobuf message upfront, and the message will not change
>  ### Option two allows the user to choose the controller service responsible 
> for resolving the message name. In this case, the controller service looks at 
> the header of the message payload to get the information needed to locate the 
> Message within the protobuf schema (the information is a message index array, 
> see protobuf wire format). This is done by 
> *ConfluentProtobufMessageNameResolver(new)* service.
> *Explanation:*
> Why do we need message name resolving? Protobuf schemas can contain many 
> message definitions at the root level (see example below). We need to 
> explicitly give the message name to the deserializer for it to be able to do 
> its job. A Schema file or schema text alone may not be enough because it may 
> be ambiguous. 
> The ProtobufReader always needs the name of the proto schema message to be 
> able to decode the binary payload. 
> The new strategy defines a new interface *MessageNameResolver* 
> *({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently, 
> one implementation exists: *ConfluentProtobufMessageNameResolver* 
> *({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
> The ConfluentProtobufMessageNameResolver looks at the second byte in the 
> input stream to decode Confluent standardized *message name indexes.* More 
> about it here: [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. More details 
> in confluent wire format document. Indexes are zigzag encoded varints. 
> *({color:#ff0000}new{color})*
> *Parsing of the protobuf message schemas.*
> After *ConfluentProtobufMessageNameResolver* decodes the message indexes it 
> looks for the protobuf message name in the *SchemaDefinition 
> ({color:#ff0000}new{color}).* Messages can be nested, so the message index 
> decoded from the content can point to the nested message like 
> `MessageA.MessageE.MessageG` 
>  {{                 !image-2025-07-16-12-52-20-296.png!  }}
>  ### The *ConfluentMessageNameResolver* need to parse the schema in order to 
> know the structure of the schema and locate the right message name 
> *({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 
> grammar was downloaded from the antlr repository. Then the maven plugin 
> generates the Parser for proto3 schema format. 
> *Why not use the wire library to parse it?*
> In order to get the schema message, we need to parse the schema, not compile 
> it. The difference is that we can parse an incomplete schema as opposed to 
> compilation, where we would need a schema + imported schemas. Confluent in 
> the implementation of its schema-registry uses a parser from an internal wire 
> package:
> com.squareup.wire.schema.internal.parser.*
> The decision was made not to use internal classes from 3rd party library. I 
> think they are not public anymore in the newer versions. Additionally, I did 
> not want to add a wire library dependency and its dependencies to the 
> confluent-bundle because I remembered the maintainers favor keeping the 
> memory footprint small. ANTLr parser uses a very small standalone 
> antl-runtime.jar  
>  # The user configures *ConsumeKafka* processor with the processing strategy 
> set to *RECORD (existing)* 
>  
>  
> Currently for Protobuf record parsing there is a 
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
>  which supports proto files on a [local filesystem 
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>  
> The change aims on:
>  * supporting schema registries to retrieve proto definitions. 
>  * support for decoding protobuf [wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new 
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
> collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to 
> translate an encoded Protobuf message index into a Protobuf Message name 
> that’s later going to be used to find a correct Protobuf message for 
> deserialization.
> The interface will be put into the _nifi-schema-registry-service-api_ module. 
> Schema Registry dependent implementations will land into their respective 
> modules. E.g. _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
>  
> The existing *ProtobufReader* implementation has two properties ( Proto 
> Directory, Message Type) that are always required. To keep the backward 
> compatibility, we will make those two properties optional and add a custom 
> validator to require them when 'Generate From Protofile' schema access 
> strategy is selected. 
> Attribute modifications:
>  
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Message Name Resolver|Service implementation responsible for reading 
> FlowFile attributes or content to determine the name of a Protobuf message to 
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new 
> property, available only when _Schema Access Strategy_ is 
> _SCHEMA_REFERENCE_READER_|
> |Proto Directory, Message Type|Currently required. Will be marked as optional 
> and custom validation will require them when the *Schema Access Strategy* is 
> set to *Generate From Proto file*| | |
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to