[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lukas Kucharski updated NIFI-14424:
-----------------------------------
    Description: 
h1. Introduction

This change enables NIFI to deserialize protobuf messages encoded using the 
Confluent [protobuf wire 
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 Confluent kafka protobuf serde components use a custom header for the message 
payload. The header needs to be parsed and interpreted. The existing NIFI 
Confluent integration can decode Confluent type headers only for the payload 
containing Avro data. For protobuf data, an additional variable-length byte 
section is being added to the header by the Kafka protobuf serializer. 

 

In addition, a new implementation for *ProtobufReader, ProtobufReader2 (working 
name)* will be added. The existing ProtobufReader component can't be easily 
extended without breaking backward compatibility because two component 
properties *Proto Directory* and *Message Type* are always required. So, even 
when the component is configured to use a schema registry and schema reference 
reader, the user needs to provide those two parameters. It looks like the 
component was never intended to be used with any other *Schema Access Strategy* 
than {*}Generate From Proto File{*}.  Additionally, the current implementation 
can't work in a cluster without copying protobuf schemas manually onto every 
node.  

*ProtobufReader2(new)* will not support the *Generate From Proto File* schema 
access strategy. It will support the remaining ones from the list below:
!image-2025-07-17-10-32-37-291.png!
h1. History

Initially, the changes have been pushed for the review in [this 
pr|https://github.com/apache/nifi/pull/10094]
h2. TODO: explain nomenclature
h2. 
The use case
 # The user has a schema registry and a Kafka cluster with the topic containing 
messages encoded according to the Confluent [protobuf wire 
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 (Magic byte + 4byte schema_id + varint message indexes + payload)
 # The user configures *ProtobufReader2 (working name)* service 
{color:#FF0000}{color:#172b4d}({color}{*}NEW - not in 
PR{*}{color:#172b4d}){color}{color}
 ## User sets Schema Access Strategy property to *Schema Reference Reader 
(existing)*
 ### *ConfluentSchemaRegistry* was extended to support the fetching of 
referenced/included schemas. If a protobuf schema imports other schemas, they 
are fetched from the registry recursively. *({color:#ff0000}new{color})*
 ### *SchemaRegistry (existing)* interface was extended to support returning a 
raw schema form *({color:#ff0000}NEW - in PR{color})* in addition to the 
current RecordSchema. The first design proposed by [~awelless] called for 
introducing a {-}*SchemaDefinitionProvider*{-}, but during the implementation, 
it seemed that fetching and returning raw schema definition better suited the 
existing abstraction of SchemaRegistry, and no new abstraction was introduced 
to the API. With this change, a new abstraction was introduced to 
nifi-schema-registry-service-api - *SchemaDefinition.* The intention of this is 
to represent a raw form of various schemas.
 ## Sets Schema Registry property *(existing ConfluentSchemaRegistry)* 
 ## Sets Schema Reference reader ({*}existing 
ConfluentEncodedSchemaReferenceReader){*}
 ## Sets *Message Name Resolver Strategy* property {*}(NEW){*}. Two options are 
available. One - *Message Type Property* *({color:#ff0000}NEW{color}),* Two - 
*Message Name Resolver Service ({color:#ff0000}NEW{color})*
 ### Option one exists for simple cases where the user knows the name of the 
protobuf message upfront, and the message will not change
 ### Option two allows the user to choose the controller service responsible 
for resolving the message name. In this case, the controller service looks at 
the header of the message payload to get the information needed to locate the 
Message within the protobuf schema (the information is a message index array, 
see protobuf wire format). This is done by 
*ConfluentProtobufMessageNameResolver(new)* service.
*Explanation:*
Why do we need message name resolving? Protobuf schemas can contain many 
message definitions at the root level (see example below). We need to 
explicitly give the message name to the deserializer for it to be able to do 
its job. A Schema file or schema text alone may not be enough because it may be 
ambiguous. 
The ProtobufReader always needs the name of the proto schema message to be able 
to decode the binary payload. 
The new strategy defines a new interface *MessageNameResolver* 
*({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently, 
one implementation exists: *ConfluentProtobufMessageNameResolver* 
*({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
The ConfluentProtobufMessageNameResolver looks at the second byte in the input 
stream to decode Confluent standardized *message name indexes.* More about it 
here: [protobuf wire 
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. More details 
in confluent wire format document. Indexes are zigzag encoded varints. 
*({color:#ff0000}new{color})*
*Parsing of the protobuf message schemas.*
After *ConfluentProtobufMessageNameResolver* decodes the message indexes it 
looks for the protobuf message name in the *SchemaDefinition 
({color:#ff0000}new{color}).* Messages can be nested, so the message index 
decoded from the content can point to the nested message like 
`MessageA.MessageE.MessageG` 
 {{                 !image-2025-07-16-12-52-20-296.png!  }}
 ### The *ConfluentMessageNameResolver* need to parse the schema in order to 
know the structure of the schema and locate the right message name 
*({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 grammar 
was downloaded from the antlr repository. Then the maven plugin generates the 
Parser for proto3 schema format. 
*Why not use the wire library to parse it?*
In order to get the schema message, we need to parse the schema, not compile 
it. The difference is that we can parse an incomplete schema as opposed to 
compilation, where we would need a schema + imported schemas. Confluent in the 
implementation of its schema-registry uses a parser from an internal wire 
package:
com.squareup.wire.schema.internal.parser.*
The decision was made not to use internal classes from 3rd party library. I 
think they are not public anymore in the newer versions. Additionally, I did 
not want to add a wire library dependency and its dependencies to the 
confluent-bundle because I remembered the maintainers favor keeping the memory 
footprint small. ANTLr parser uses a very small standalone antl-runtime.jar  
 # The user configures *ConsumeKafka* processor with the processing strategy 
set to *RECORD (existing)* 

 

 

Currently for Protobuf record parsing there is a 
[ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
 which supports proto files on a [local filesystem 
only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
 
The change aims on:
 * supporting schema registries to retrieve proto definitions. 
 * support for decoding protobuf [wire 
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 * support fetching imported (referenced) proto files in protobuf schemas

Since presence of original proto files is a must for parsing, a new 
abstractions related to schemas will be introduced.
h2. SchemaDefinition

A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
collection of referenced {_}SchemaDefinitions{_}.
h2. MessageNameResolver

In the case of Protobuf a single proto file may contain multiple message 
entries. To understand what message to use for decoding, a message index is 
usually encoded in the payload. And as with a schema reference, different 
systems encode this index differently.

_MessageNameResolver_ will be a new _ControllerService_ that is going to 
translate an encoded Protobuf message index into a Protobuf Message name that’s 
later going to be used to find a correct Protobuf message for deserialization.

The interface will be put into the _nifi-schema-registry-service-api_ module. 
Schema Registry dependent implementations will land into their respective 
modules. E.g. _nifi-confluent-schema-registry-service_ for Confluent.
h2. ProtobufReader

 

The existing *ProtobufReader* implementation has two properties ( Proto 
Directory, Message Type) that are always required. To keep the backward 
compatibility, we will make those two properties optional and add a custom 
validator to require them when 'Generate From Protofile' schema access strategy 
is selected. 

Attribute modifications:

 
|*Attribute Name*|*Description*|*Value*|*Comment*|
|Message Name Resolver|Service implementation responsible for reading FlowFile 
attributes or content to determine the name of a Protobuf message to use for 
deserialization.|<{_}MessageNameResolver{_}> impl|This is a new property, 
available only when _Schema Access Strategy_ is _SCHEMA_REFERENCE_READER_|
|Proto Directory, Message Type|Currently required. Will be marked as optional 
and custom validation will require them when the *Schema Access Strategy* is 
set to *Generate From Proto file*| | |

 

  was:
h2. Introduction

This change enables NIFI to deserialize protobuf messages encoded using the 
Confluent [protobuf wire 
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 Confluent kafka protobuf serde components use a custom header for the message 
payload. The header needs to be parsed and interpreted. The existing NIFI 
Confluent integration can decode Confluent type headers only for the payload 
containing Avro data. For protobuf data, an additional variable-length byte 
section is being added to the header by the Kafka protobuf serializer. 

 

In addition, a new implementation for *ProtobufReader, ProtobufReader2 (working 
name)* will be added. The existing ProtobufReader component can't be easily 
extended without breaking backward compatibility because two component 
properties *Proto Directory* and *Message Type* are always required. So, even 
when the component is configured to use a schema registry and schema reference 
reader, the user needs to provide those two parameters. It looks like the 
component was never intended to be used with any other *Schema Access Strategy* 
than {*}Generate From Proto File{*}.  Additionally, the current implementation 
can't work in a cluster without copying protobuf schemas manually onto every 
node.  

*ProtobufReader2(new)* will not support the *Generate From Proto File* schema 
access strategy. It will support the remaining ones from the list below:
!image-2025-07-17-10-32-37-291.png!
h2. The use case
 # The user has a schema registry and a Kafka cluster with the topic containing 
messages encoded according to the Confluent [protobuf wire 
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 (Magic byte + 4byte schema_id + varint message indexes + payload)
 # The user configures *ProtobufReader2 (working name)* service ({*}NEW{*})
 ## User sets Schema Access Strategy property to *Schema Reference Reader 
(existing)*
 ### *ConfluentSchemaRegistry* was extended to support the fetching of 
referenced/included schemas. If a protobuf schema imports other schemas, they 
are fetched from the registry recursively. *({color:#ff0000}new{color})*
 ### *SchemaRegistry (existing)* interface was extended to support returning a 
raw schema form *({color:#ff0000}new{color})* in addition to the current 
RecordSchema. The first design proposed by [~awelless] called for introducing a 
{-}*SchemaDefinitionProvider*{-}, but during the implementation, it seemed that 
fetching and returning raw schema definition better suited the existing 
abstraction of SchemaRegistry, and no new abstraction was introduced to the 
API. With this change, a new abstraction was introduced to 
nifi-schema-registry-service-api - *SchemaDefinition.* The intention of this is 
to represent a raw form of various schemas.
 ## Sets Schema Registry property *(existing ConfluentSchemaRegistry)* 
 ## Sets Schema Reference reader ({*}existing 
ConfluentEncodedSchemaReferenceReader){*}
 ## Sets *Message Name Resolver Strategy* property {*}(NEW){*}. Two options are 
available. One - *Message Type Property* *({color:#ff0000}NEW{color}),* Two - 
*Message Name Resolver Service ({color:#ff0000}NEW{color})*
 ### Option one exists for simple cases where the user knows the name of the 
protobuf message upfront, and the message will not change
 ### Option two allows the user to choose the controller service responsible 
for resolving the message name. In this case, the controller service looks at 
the header of the message payload to get the information needed to locate the 
Message within the protobuf schema (the information is a message index array, 
see protobuf wire format). This is done by 
*ConfluentProtobufMessageNameResolver(new)* service.
*Explanation:*
Why we need message name resolving? Protobuf schemas can contain many message 
definitions at the root level (see example below). We need to explicitly give 
the message name to the deserializer for it to be able to do its job. A Schema 
file or schema text alone may be not enough because it may be ambiguous. 
The ProtobufReader always needs the name of the proto schema message to be able 
to decode the binary payload. 
The new strategy defines the new interface *MessageNameResolver* 
*({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently, 
one implementation exists: *ConfluentProtobufMessageNameResolver* 
*({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
The ConfluentProtobufMessageNameResolver looks at the second byte in the input 
stream to decode Confluent standardized *message name indexes.* More about it 
here: [protobuf wire 
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. More details 
in confluent wire format document. Indexes are zigzag encoded varints. 
*({color:#ff0000}new{color})*
*Parsing of the protobuf message schemas.*
After *ConfluentProtobufMessageNameResolver* decodes the message indexes it 
looks for the protobuf message name in the *SchemaDefinition 
({color:#ff0000}new{color}).* Messages can be nested, so the message index 
decoded from the content can point to the nested message like 
`MessageA.MessageE.MessageG` 
 {{                 !image-2025-07-16-12-52-20-296.png!  }}
 ### The *ConfluentMessageNameResolver* need to parse the schema in order to 
know the structure of the schema and locate the right message name 
*({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 grammar 
was downloaded from the antlr repository. Then the maven plugin generates the 
Parser for proto3 schema format. 
*Why not use the wire library to parse it?*
In order to get the schema message, we need to parse the schema, not compile 
it. The difference is that we can parse an incomplete schema as opposed to 
compilation, where we would need a schema + imported schemas. Confluent in the 
implementation of its schema-registry uses a parser from an internal wire 
package:
com.squareup.wire.schema.internal.parser.*
The decision was made not to use internal classes from 3rd party library. I 
think they are not public anymore in the newer versions. Additionally, I did 
not want to add a wire library dependency and its dependencies to the 
confluent-bundle because I remembered the maintainers favor keeping the memory 
footprint small. ANTLr parser uses a very small standalone antl-runtime.jar  
 # The user configures *ConsumeKafka* processor with the processing strategy 
set to *RECORD (existing)* 

 

 

Currently for Protobuf record parsing there is a 
[ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
 which supports proto files on a [local filesystem 
only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
 
The change aims on:
 * supporting schema registries to retrieve proto definitions. 
 * support for decoding protobuf [wire 
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 * support fetching imported (referenced) proto files in protobuf schemas

Since presence of original proto files is a must for parsing, a new 
abstractions related to schemas will be introduced.
h2. SchemaDefinition

A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
collection of referenced {_}SchemaDefinitions{_}.
h2. MessageNameResolver

In the case of Protobuf a single proto file may contain multiple message 
entries. To understand what message to use for decoding, a message index is 
usually encoded in the payload. And as with a schema reference, different 
systems encode this index differently.

_MessageNameResolver_ will be a new _ControllerService_ that is going to 
translate an encoded Protobuf message index into a Protobuf Message name that’s 
later going to be used to find a correct Protobuf message for deserialization.

The interface will be put into the _nifi-schema-registry-service-api_ module. 
Schema Registry dependent implementations will land into their respective 
modules. E.g. _nifi-confluent-schema-registry-service_ for Confluent.
h2. ProtobufReader

 

The existing *ProtobufReader* implementation has two properties ( Proto 
Directory, Message Type) that are always required. To keep the backward 
compatibility, we will make those two properties optional and add a custom 
validator to require them when 'Generate From Protofile' schema access strategy 
is selected. 

Attribute modifications:

 
|*Attribute Name*|*Description*|*Value*|*Comment*|
|Message Name Resolver|Service implementation responsible for reading FlowFile 
attributes or content to determine the name of a Protobuf message to use for 
deserialization.|<{_}MessageNameResolver{_}> impl|This is a new property, 
available only when _Schema Access Strategy_ is _SCHEMA_REFERENCE_READER_|
|Proto Directory, Message Type|Currently required. Will be marked as optional 
and custom validation will require them when the *Schema Access Strategy* is 
set to *Generate From Proto file*| | |

 


> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>         Attachments: image-2025-07-16-12-52-20-296.png, 
> image-2025-07-17-10-32-37-291.png
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> h1. Introduction
> This change enables NIFI to deserialize protobuf messages encoded using the 
> Confluent [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  Confluent kafka protobuf serde components use a custom header for the 
> message payload. The header needs to be parsed and interpreted. The existing 
> NIFI Confluent integration can decode Confluent type headers only for the 
> payload containing Avro data. For protobuf data, an additional 
> variable-length byte section is being added to the header by the Kafka 
> protobuf serializer. 
>  
> In addition, a new implementation for *ProtobufReader, ProtobufReader2 
> (working name)* will be added. The existing ProtobufReader component can't be 
> easily extended without breaking backward compatibility because two component 
> properties *Proto Directory* and *Message Type* are always required. So, even 
> when the component is configured to use a schema registry and schema 
> reference reader, the user needs to provide those two parameters. It looks 
> like the component was never intended to be used with any other *Schema 
> Access Strategy* than {*}Generate From Proto File{*}.  Additionally, the 
> current implementation can't work in a cluster without copying protobuf 
> schemas manually onto every node.  
> *ProtobufReader2(new)* will not support the *Generate From Proto File* schema 
> access strategy. It will support the remaining ones from the list below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for the review in [this 
> pr|https://github.com/apache/nifi/pull/10094]
> h2. TODO: explain nomenclature
> h2. 
> The use case
>  # The user has a schema registry and a Kafka cluster with the topic 
> containing messages encoded according to the Confluent [protobuf wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  (Magic byte + 4byte schema_id + varint message indexes + payload)
>  # The user configures *ProtobufReader2 (working name)* service 
> {color:#FF0000}{color:#172b4d}({color}{*}NEW - not in 
> PR{*}{color:#172b4d}){color}{color}
>  ## User sets Schema Access Strategy property to *Schema Reference Reader 
> (existing)*
>  ### *ConfluentSchemaRegistry* was extended to support the fetching of 
> referenced/included schemas. If a protobuf schema imports other schemas, they 
> are fetched from the registry recursively. *({color:#ff0000}new{color})*
>  ### *SchemaRegistry (existing)* interface was extended to support returning 
> a raw schema form *({color:#ff0000}NEW - in PR{color})* in addition to the 
> current RecordSchema. The first design proposed by [~awelless] called for 
> introducing a {-}*SchemaDefinitionProvider*{-}, but during the 
> implementation, it seemed that fetching and returning raw schema definition 
> better suited the existing abstraction of SchemaRegistry, and no new 
> abstraction was introduced to the API. With this change, a new abstraction 
> was introduced to nifi-schema-registry-service-api - *SchemaDefinition.* The 
> intention of this is to represent a raw form of various schemas.
>  ## Sets Schema Registry property *(existing ConfluentSchemaRegistry)* 
>  ## Sets Schema Reference reader ({*}existing 
> ConfluentEncodedSchemaReferenceReader){*}
>  ## Sets *Message Name Resolver Strategy* property {*}(NEW){*}. Two options 
> are available. One - *Message Type Property* *({color:#ff0000}NEW{color}),* 
> Two - *Message Name Resolver Service ({color:#ff0000}NEW{color})*
>  ### Option one exists for simple cases where the user knows the name of the 
> protobuf message upfront, and the message will not change
>  ### Option two allows the user to choose the controller service responsible 
> for resolving the message name. In this case, the controller service looks at 
> the header of the message payload to get the information needed to locate the 
> Message within the protobuf schema (the information is a message index array, 
> see protobuf wire format). This is done by 
> *ConfluentProtobufMessageNameResolver(new)* service.
> *Explanation:*
> Why do we need message name resolving? Protobuf schemas can contain many 
> message definitions at the root level (see example below). We need to 
> explicitly give the message name to the deserializer for it to be able to do 
> its job. A Schema file or schema text alone may not be enough because it may 
> be ambiguous. 
> The ProtobufReader always needs the name of the proto schema message to be 
> able to decode the binary payload. 
> The new strategy defines a new interface *MessageNameResolver* 
> *({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently, 
> one implementation exists: *ConfluentProtobufMessageNameResolver* 
> *({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
> The ConfluentProtobufMessageNameResolver looks at the second byte in the 
> input stream to decode Confluent standardized *message name indexes.* More 
> about it here: [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. More details 
> in confluent wire format document. Indexes are zigzag encoded varints. 
> *({color:#ff0000}new{color})*
> *Parsing of the protobuf message schemas.*
> After *ConfluentProtobufMessageNameResolver* decodes the message indexes it 
> looks for the protobuf message name in the *SchemaDefinition 
> ({color:#ff0000}new{color}).* Messages can be nested, so the message index 
> decoded from the content can point to the nested message like 
> `MessageA.MessageE.MessageG` 
>  {{                 !image-2025-07-16-12-52-20-296.png!  }}
>  ### The *ConfluentMessageNameResolver* need to parse the schema in order to 
> know the structure of the schema and locate the right message name 
> *({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 
> grammar was downloaded from the antlr repository. Then the maven plugin 
> generates the Parser for proto3 schema format. 
> *Why not use the wire library to parse it?*
> In order to get the schema message, we need to parse the schema, not compile 
> it. The difference is that we can parse an incomplete schema as opposed to 
> compilation, where we would need a schema + imported schemas. Confluent in 
> the implementation of its schema-registry uses a parser from an internal wire 
> package:
> com.squareup.wire.schema.internal.parser.*
> The decision was made not to use internal classes from 3rd party library. I 
> think they are not public anymore in the newer versions. Additionally, I did 
> not want to add a wire library dependency and its dependencies to the 
> confluent-bundle because I remembered the maintainers favor keeping the 
> memory footprint small. ANTLr parser uses a very small standalone 
> antl-runtime.jar  
>  # The user configures *ConsumeKafka* processor with the processing strategy 
> set to *RECORD (existing)* 
>  
>  
> Currently for Protobuf record parsing there is a 
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
>  which supports proto files on a [local filesystem 
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>  
> The change aims on:
>  * supporting schema registries to retrieve proto definitions. 
>  * support for decoding protobuf [wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new 
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
> collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to 
> translate an encoded Protobuf message index into a Protobuf Message name 
> that’s later going to be used to find a correct Protobuf message for 
> deserialization.
> The interface will be put into the _nifi-schema-registry-service-api_ module. 
> Schema Registry dependent implementations will land into their respective 
> modules. E.g. _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
>  
> The existing *ProtobufReader* implementation has two properties ( Proto 
> Directory, Message Type) that are always required. To keep the backward 
> compatibility, we will make those two properties optional and add a custom 
> validator to require them when 'Generate From Protofile' schema access 
> strategy is selected. 
> Attribute modifications:
>  
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Message Name Resolver|Service implementation responsible for reading 
> FlowFile attributes or content to determine the name of a Protobuf message to 
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new 
> property, available only when _Schema Access Strategy_ is 
> _SCHEMA_REFERENCE_READER_|
> |Proto Directory, Message Type|Currently required. Will be marked as optional 
> and custom validation will require them when the *Schema Access Strategy* is 
> set to *Generate From Proto file*| | |
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to