[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lukas Kucharski updated NIFI-14424:
-----------------------------------
    Attachment: image-2025-07-17-10-32-37-291.png

> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>         Attachments: image-2025-07-16-12-52-20-296.png, 
> image-2025-07-17-10-32-37-291.png
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> This change enables NIFI to deserialize protobuf messages encoded using the 
> Confluent [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  Confluent kafka protobuf serde components use a custom header for the 
> message payload. The header needs to be parsed and interpreted. The existing 
> NIFI Confluent integration can decode Confluent type headers only for the 
> payload containing Avro data. For protobuf data, an additional 
> variable-length byte section is being added to the header by the Kafka 
> protobuf serializer. 
>  
> In addition a new implementation for *ProtobufReader, ProtobufReader2 
> (working name)* will be added. Existing 
>  
> The use case:
>  # The user has a schema registry and a Kafka cluster with the topics 
> containing messages encoded according to the Confluent protobuf wire format 
> (Magic byte + 4byte schema_id + varint message indexes + payload)
>  # The user configures *ProtobufReader2 (working name)* service ({*}NEW{*})
>  ## User sets Schema Access Strategy property to *Schema Reference Reader 
> (existing)*
>  ### *ConfluentSchemaRegistry* was extended to support the fetching of 
> referenced/included schemas. If a protobuf schema imports other schemas, they 
> are fetched from the registry recursively. *({color:#ff0000}new{color})*
>  ### *SchemaRegistry (existing)* interface was extended to support returning 
> a raw schema form *({color:#ff0000}new{color})* in addition to the current 
> RecordSchema. The first design proposed by [~awelless] called for introducing 
> a {-}*SchemaDefinitionProvider*{-}, but during the implementation, it seemed 
> that fetching and returning raw schema definition better suited the existing 
> abstraction of SchemaRegistry, and no new abstraction was introduced to the 
> API. With this change, a new abstraction was introduced to 
> nifi-schema-registry-service-api - *SchemaDefinition.* The intention of this 
> is to represent a raw form of various schemas.
>  ## Sets Schema Registry property *(existing)*
>  ## Sets Schema Reference reader ({*}existing){*}
>  ## Sets *Message Name Resolver Strategy.* Two options are available. One - 
> *Message Type Property* *({color:#ff0000}NEW{color}),* Two - *Message Name 
> Resolver Service ({color:#ff0000}NEW{color})*
>  ### Option one exists for backward compatibility and allows users to use 
> ProtobufReader with Proto Directory and Message Type properties
>  ### Option two allows the user to choose the controller service responsible 
> for resolving the message name for decoding. 
> *Explanation:*
> The ProtobufReader always needs the name of the proto schema message to be 
> able to decode the binary payload. In its current form, the Message name is 
> set via the property *Message Type (existing)* 
> The new strategy defines the new interface *MessageNameResolver* 
> *({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently, 
> one implementation exists: *ConfluentProtobufMessageNameResolver* 
> *({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
> The ConfluentProtobufMessageNameResolver looks at the second byte in the 
> input stream to decode Confluent standardized *message name indexes.* More 
> about it here: [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. More details 
> in confluent wire format document. Indexes are zigzag encoded varints. 
> *({color:#ff0000}new{color})*
> *Parsing of the protobuf message schemas.*
> After *ConfluentProtobufMessageNameResolver* decodes the message indexes it 
> looks for the protobuf message name in the *SchemaDefinition 
> ({color:#ff0000}new{color}).* Messages can be nested, so the message index 
> decoded from the content can point to the nested message like 
> `MessageA.MessageE.MessageG` 
>  {{                 !image-2025-07-16-12-52-20-296.png!  }}
>  ### The *ConfluentMessageNameResolver* need to parse the schema in order to 
> know the structure of the schema and locate the right message name 
> *({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 
> grammar was downloaded from the antlr repository. Then the maven plugin 
> generates the Parser for proto3 schema format. 
> *Why not use the wire library to parse it?*
> In order to get the schema message, we need to parse the schema, not compile 
> it. The difference is that we can parse an incomplete schema as opposed to 
> compilation, where we would need a schema + imported schemas. Confluent in 
> the implementation of its schema-registry uses a parser from an internal wire 
> package:
> com.squareup.wire.schema.internal.parser.*
> The decision was made not to use internal classes from 3rd party library. I 
> think they are not public anymore in the newer versions. Additionally, I did 
> not want to add a wire library dependency and its dependencies to the 
> confluent-bundle because I remembered the maintainers favor keeping the 
> memory footprint small. ANTLr parser uses a very small standalone 
> antl-runtime.jar  
>  # The user configures *ConsumeKafka* processor with the processing strategy 
> set to *RECORD (existing)* 
>  
>  
> Currently for Protobuf record parsing there is a 
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
>  which supports proto files on a [local filesystem 
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>  
> The change aims on:
>  * supporting schema registries to retrieve proto definitions. 
>  * support for decoding protobuf [wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new 
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
> collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to 
> translate an encoded Protobuf message index into a Protobuf Message name 
> that’s later going to be used to find a correct Protobuf message for 
> deserialization.
> The interface will be put into the _nifi-schema-registry-service-api_ module. 
> Schema Registry dependent implementations will land into their respective 
> modules. E.g. _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
>  
> The existing *ProtobufReader* implementation has two properties ( Proto 
> Directory, Message Type) that are always required. To keep the backward 
> compatibility, we will make those two properties optional and add a custom 
> validator to require them when 'Generate From Protofile' schema access 
> strategy is selected. 
> Attribute modifications:
>  
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Message Name Resolver|Service implementation responsible for reading 
> FlowFile attributes or content to determine the name of a Protobuf message to 
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new 
> property, available only when _Schema Access Strategy_ is 
> _SCHEMA_REFERENCE_READER_|
> |Proto Directory, Message Type|Currently required. Will be marked as optional 
> and custom validation will require them when the *Schema Access Strategy* is 
> set to *Generate From Proto file*| | |
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to