[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Handermann resolved NIFI-14424.
-------------------------------------
    Fix Version/s: 2.6.0
       Resolution: Fixed

> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>             Fix For: 2.6.0
>
>         Attachments: image-2025-07-16-12-52-20-296.png, 
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png, 
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png, 
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png, 
> image-2025-07-17-14-52-18-792.png
>
>          Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> h1. Introduction
> This change enables NIFI to deserialize protobuf messages encoded using the 
> Confluent [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  Confluent kafka protobuf serde components use a custom header for the 
> message payload. The header needs to be parsed and interpreted. The existing 
> NIFI Confluent integration can decode Confluent type headers only for the 
> payload containing Avro data. For protobuf data, an additional 
> variable-length byte section gets added to the header by the Kafka protobuf 
> serializer. 
>  
> In addition, a new implementation for *ProtobufReader - ProtobufReader2 
> (working name)* will be added. The existing ProtobufReader component can't be 
> easily extended without breaking backward compatibility because two component 
> properties *Proto Directory* and *Message Type* are always required. So, even 
> when the component is configured to use a schema registry and schema 
> reference reader, the user needs to provide those two parameters. It looks 
> like the component was never intended to be used with any other *Schema 
> Access Strategy* than {*}Generate From Proto File{*}.  Additionally, the 
> current implementation can't work in a cluster without copying protobuf 
> schemas manually onto every node.  
> *ProtobufReader2* will not support the *Generate From Proto File* schema 
> access strategy. It will support the remaining ones from the list below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this 
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look 
> by [~exceptionfactory], we decided that it's reasonable to create a new 
> implementation of ProtobufReader instead of maintaining the old one. In 
> addition, the PR will be divided into two smaller ones. First will cover 
> changes to the confluent-bundle and common apis, and the second will cover 
> the new ProtobufReader implementation
> With this context, the usecase below uses:
> {color:#de350b}*NEW* - {color}to indicate a feature or a component will be 
> added as a result of this ticket.
> {color:#de350b}*EXISTING*{color} - to indicate a feature or a component 
> existing in the current NIFI codebase
> h2.  
> h1. The use case
>  # The user has a schema registry and a Kafka cluster with the topic 
> containing messages encoded according to the Confluent [protobuf wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  (Magic byte + 4byte schema_id + varint message indexes + payload)
>  # The user configures *ProtobufReader2 (working name)* service 
> {color:#ff0000}(NEW{color}{color:#de350b}). The functionality needed for 
> ProtobufReader2 is now added in [this 
> pr|https://github.com/apache/nifi/pull/10094] to the existing ProtobufReader 
> and needs to be moved to the new ProtobufReader2{color}
>  # In ProtobufReader2 user sets Schema Access Strategy property to *Schema 
> Reference Reader (EXISTING)*
>  # In ProtobufReader2 sets Schema Registry property *(EXISTING) 
> ConfluentSchemaRegistry*
>  ## *ConfluentSchemaRegistry* was extended to support the fetching of 
> referenced/included schemas. If a protobuf schema imports other schemas, they 
> are fetched from the registry recursively. *(NEW)*
>  ## *SchemaRegistry (EXISTING)* interface was extended to support returning a 
> raw schema form ({*}NEW{*}), in addition to the currently returned 
> RecordSchema. With this change, a new abstraction was introduced to 
> nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of 
> this is to represent a raw form of various schemas.
>  # In ProtobufReader2 user sets Schema Reference reader ({*}existing 
> ConfluentEncodedSchemaReferenceReader){*}
>  # In ProtobufReader2 user sets *Message Name Resolver Strategy* property 
> {*}(NEW){*}. Two options are available. One - *Message Name Property* 
> *(NEW),* Two - *Message Name Resolver Service (NEW)*
>  ## Option one will exist for simple cases where the user knows the name of 
> the protobuf message upfront, and the message will not change dynamically 
> during normal work.
>  ## Option two allows the user to choose the controller service responsible 
> for resolving the message name. In this case, the controller service looks at 
> the header of the message payload to get the information needed to locate the 
> Message within the protobuf schema (the information is a message index array, 
> see protobuf wire format). This is done by 
> *ConfluentProtobufMessageNameResolver(NEW)* service.
>  ### *Explanation:* Why do we need message name resolving? Protobuf schemas 
> can contain many message definitions at the root level (see example below). 
> We need to explicitly give the message name to the deserializer for it to be 
> able to do its job. A Schema file or schema text alone may not be enough 
> because it may be ambiguous. The ProtobufReader/2 always needs the name of 
> the proto schema message to be able to decode the binary payload.  The new 
> strategy defines a new interface *MessageNameResolver* *(NEW)* in 
> nifi-schema-registry-service-api. Currently, one implementation exists: 
> *ConfluentProtobufMessageNameResolver* *(NEW)* in 
> nifi-confluent-platform-bundle.
>  ### The ConfluentProtobufMessageNameResolver looks at the second byte in the 
> input stream to decode Confluent standardized *message name indexes.* More 
> about it here: [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are 
> zigzag encoded varints. *(NEW)*
>  ### *Parsing of the protobuf message schemas:* After 
> *ConfluentProtobufMessageNameResolver* decodes the message indexes it looks 
> for the protobuf message name in the *SchemaDefinition (NEW).* Messages can 
> be nested, so the message index decoded from the content can point to the 
> nested message like `MessageA.MessageE.MessageG` {{  
> !image-2025-07-16-12-52-20-296.png!   }}
>  ### The *ConfluentMessageNameResolver* needs to parse the schema in order to 
> know the structure of the schema and locate the right message name *(NEW).* 
> For this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being 
> downloaded from the antlr repository at build time. Then the maven plugin 
> generates the Parser for proto3 schema format.
>  #### *Why not use the wire library to parse it?* In order to get the schema 
> message, we need to parse the schema, not compile it. The difference is that 
> we can parse an incomplete schema as opposed to compilation, where we would 
> need a schema + imported schemas. Confluent in the implementation of its 
> schema-registry uses a parser from an internal wire package: 
> com.squareup.wire.schema.internal.parser.*. The decision was made not to use 
> internal classes from 3rd party library. I think they are not public anymore 
> in the newer versions. Additionally, I did not want to add a wire library 
> dependency and its dependencies to the confluent-bundle because I remembered 
> the maintainers favor keeping the memory footprint small. ANTLr parser uses a 
> very small standalone antl-runtime.jar
>  # The user configures *ConsumeKafka* processor with the processing strategy 
> set to *RECORD (existing)* and sets the *Record Reader* property to the 
> instance configured in earlier steps
> h1. New components introduced to API in nifi-schema-registry-service-api
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and 
> a collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ is a new _ControllerService_ that is going to translate 
> an encoded Protobuf message index into a Protobuf Message name that’s later 
> going to be used to find a correct Protobuf message for deserialization.
> Schema Registry dependent implementations will land in their respective 
> modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageName
> Abstracts value returned by the MessageNameResolver
> h1. New components in nifi-protobuf-bundle
> h2. ProtobufReader2
> Working name. The component will extend an abstract *SchemaRegistryService* 
> like all Readers do. 
> It will support the following Schema Access Strategies:
> !image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
>  
> *Message Name Resolver Strategy* property will have two available allowable 
> values:
> !image-2025-07-17-14-52-18-792.png!
> When *Message name property* value gets selected, the component will show a 
> new property (not visible on screens above) called *Message Name,* where the 
> user can provide FQN name of the message 
> When *Message Name Resolver Service* gets selected, the component will show a 
> new property (not visible on screens above) called *Message Name Resolver 
> Service,* where a user can provide a reference to the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to