[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lukas Kucharski updated NIFI-14424:
-----------------------------------
Description:
h1. Introduction
The ticket is split into 2 PRs.
The fist PR [https://github.com/apache/nifi/pull/10105]
The second PR https://github.com/apache/nifi/pull/10214
This change enables NIFI to deserialize protobuf messages encoded using the
Confluent [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
Confluent kafka protobuf serde components use a custom header for the message
payload. The header needs to be parsed and interpreted. The existing NIFI
Confluent integration can decode Confluent type headers only for the payload
containing Avro data. For protobuf data, an additional variable-length byte
section gets added to the header by the Kafka protobuf serializer.
In addition, a new implementation for *ProtobufReader - StandardProtobufReader*
will be added. The existing ProtobufReader component can't be easily extended
without breaking backward compatibility because two component properties *Proto
Directory* and *Message Type* are always required. So, even when the component
is configured to use a schema registry and schema reference reader, the user
needs to provide those two parameters. It looks like the component was never
intended to be used with any other *Schema Access Strategy* than {*}Generate
From Proto File{*}. Additionally, the current implementation can't work in a
cluster without copying protobuf schemas manually onto every node.
*StandardProtobufReader* will not support the *Generate From Proto File* schema
access strategy. It will support the remaining ones from the list below:
!image-2025-07-17-10-32-37-291.png!
h1. History
Initially, the changes have been pushed for review in [this
pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
by [~exceptionfactory], we decided that it's reasonable to create a new
implementation of ProtobufReader instead of maintaining the old one. In
addition, the PR will be divided into two smaller ones. First will cover
changes to the confluent-bundle and common apis, and the second will cover the
new ProtobufReader implementation
With this context, the usecase below uses:
{color:#de350b}*NEW* - {color}to indicate a feature or a component will be
added as a result of this ticket.
{color:#de350b}*EXISTING*{color} - to indicate a feature or a component
existing in the current NIFI codebase
h2.
h1. The use case
# The user has a schema registry and a Kafka cluster with the topic containing
messages encoded according to the Confluent [protobuf wire
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
(Magic byte + 4byte schema_id + varint message indexes + payload)
# The user configures *StandardProtobufReader* ** service
# In *StandardProtobufReader* user sets Schema Access Strategy property to
*Schema Reference Reader (EXISTING)*
# In *StandardProtobufReader* sets Schema Registry property *(EXISTING)
ConfluentSchemaRegistry*
## *ConfluentSchemaRegistry* was extended to support the fetching of
referenced/included schemas. If a protobuf schema imports other schemas, they
are fetched from the registry recursively. *(NEW)*
## *SchemaRegistry (EXISTING)* interface was extended to support returning a
raw schema form ({*}NEW{*}), in addition to the currently returned
RecordSchema. With this change, a new abstraction was introduced to
nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of
this is to represent a raw form of various schemas.
# In *StandardProtobufReader* user sets Schema Reference reader ({*}existing
ConfluentEncodedSchemaReferenceReader){*}
# In *StandardProtobufReader* user sets *Message Name Resolver Strategy*
property {*}(NEW){*}. Two options are available. One - *Message Name Property*
*(NEW),* Two - *Message Name Resolver Service (NEW)*
## Option one will exist for simple cases where the user knows the name of the
protobuf message upfront, and the message will not change dynamically during
normal work.
## Option two allows the user to choose the controller service responsible for
resolving the message name. In this case, the controller service looks at the
header of the message payload to get the information needed to locate the
Message within the protobuf schema (the information is a message index array,
see protobuf wire format). This is done by
*ConfluentProtobufMessageNameResolver(NEW)* service.
### *Explanation:* Why do we need message name resolving? Protobuf schemas can
contain many message definitions at the root level (see example below). We need
to explicitly give the message name to the deserializer for it to be able to do
its job. A Schema file or schema text alone may not be enough because it may be
ambiguous. The ProtobufReader/2 always needs the name of the proto schema
message to be able to decode the binary payload. The new strategy defines a
new interface *MessageNameResolver* *(NEW)* in
nifi-schema-registry-service-api. Currently, one implementation exists:
*ConfluentProtobufMessageNameResolver* *(NEW)* in
nifi-confluent-platform-bundle.
### The ConfluentProtobufMessageNameResolver looks at the second byte in the
input stream to decode Confluent standardized *message name indexes.* More
about it here: [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are
zigzag encoded varints. *(NEW)*
### *Parsing of the protobuf message schemas:* After
*ConfluentProtobufMessageNameResolver* decodes the message indexes it looks for
the protobuf message name in the *SchemaDefinition (NEW).* Messages can be
nested, so the message index decoded from the content can point to the nested
message like `MessageA.MessageE.MessageG` {{
!image-2025-07-16-12-52-20-296.png! }}
### The *ConfluentMessageNameResolver* needs to parse the schema in order to
know the structure of the schema and locate the right message name *(NEW).* For
this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being downloaded
from the antlr repository at build time. Then the maven plugin generates the
Parser for proto3 schema format.
#### *Why not use the wire library to parse it?* In order to get the schema
message, we need to parse the schema, not compile it. The difference is that we
can parse an incomplete schema as opposed to compilation, where we would need a
schema + imported schemas. Confluent in the implementation of its
schema-registry uses a parser from an internal wire package:
com.squareup.wire.schema.internal.parser.*. The decision was made not to use
internal classes from 3rd party library. I think they are not public anymore in
the newer versions. Additionally, I did not want to add a wire library
dependency and its dependencies to the confluent-bundle because I remembered
the maintainers favor keeping the memory footprint small. ANTLr parser uses a
very small standalone antl-runtime.jar
# The user configures *ConsumeKafka* processor with the processing strategy
set to *RECORD (existing)* and sets the *Record Reader* property to the
instance configured in earlier steps
h1. New components introduced to API in nifi-schema-registry-service-api
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and a
collection of referenced {_}SchemaDefinitions{_}.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ is a new _ControllerService_ that is going to translate
an encoded Protobuf message index into a Protobuf Message name that’s later
going to be used to find a correct Protobuf message for deserialization.
Schema Registry dependent implementations will land in their respective
modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
h2. MessageName
Abstracts value returned by the MessageNameResolver
h1. New components in nifi-protobuf-bundle
h2. *StandardProtobufReader*
Working name. The component will extend an abstract *SchemaRegistryService*
like all Readers do.
It will support the following Schema Access Strategies:
!image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
*Message Name Resolver Strategy* property will have two available allowable
values:
!image-2025-07-17-14-52-18-792.png!
When *Message name property* value gets selected, the component will show a new
property (not visible on screens above) called *Message Name,* where the user
can provide FQN name of the message
When *Message Name Resolver Service* gets selected, the component will show a
new property (not visible on screens above) called *Message Name Resolver
Service,* where a user can provide a reference to the service.
was:
h1. Introduction
This change enables NIFI to deserialize protobuf messages encoded using the
Confluent [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
Confluent kafka protobuf serde components use a custom header for the message
payload. The header needs to be parsed and interpreted. The existing NIFI
Confluent integration can decode Confluent type headers only for the payload
containing Avro data. For protobuf data, an additional variable-length byte
section gets added to the header by the Kafka protobuf serializer.
In addition, a new implementation for *ProtobufReader - StandardProtobufReader*
will be added. The existing ProtobufReader component can't be easily extended
without breaking backward compatibility because two component properties *Proto
Directory* and *Message Type* are always required. So, even when the component
is configured to use a schema registry and schema reference reader, the user
needs to provide those two parameters. It looks like the component was never
intended to be used with any other *Schema Access Strategy* than {*}Generate
From Proto File{*}. Additionally, the current implementation can't work in a
cluster without copying protobuf schemas manually onto every node.
*StandardProtobufReader* will not support the *Generate From Proto File* schema
access strategy. It will support the remaining ones from the list below:
!image-2025-07-17-10-32-37-291.png!
h1. History
Initially, the changes have been pushed for review in [this
pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
by [~exceptionfactory], we decided that it's reasonable to create a new
implementation of ProtobufReader instead of maintaining the old one. In
addition, the PR will be divided into two smaller ones. First will cover
changes to the confluent-bundle and common apis, and the second will cover the
new ProtobufReader implementation
With this context, the usecase below uses:
{color:#de350b}*NEW* - {color}to indicate a feature or a component will be
added as a result of this ticket.
{color:#de350b}*EXISTING*{color} - to indicate a feature or a component
existing in the current NIFI codebase
h2.
h1. The use case
# The user has a schema registry and a Kafka cluster with the topic containing
messages encoded according to the Confluent [protobuf wire
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
(Magic byte + 4byte schema_id + varint message indexes + payload)
# The user configures *StandardProtobufReader* ** service
# In *StandardProtobufReader* user sets Schema Access Strategy property to
*Schema Reference Reader (EXISTING)*
# In *StandardProtobufReader* sets Schema Registry property *(EXISTING)
ConfluentSchemaRegistry*
## *ConfluentSchemaRegistry* was extended to support the fetching of
referenced/included schemas. If a protobuf schema imports other schemas, they
are fetched from the registry recursively. *(NEW)*
## *SchemaRegistry (EXISTING)* interface was extended to support returning a
raw schema form ({*}NEW{*}), in addition to the currently returned
RecordSchema. With this change, a new abstraction was introduced to
nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of
this is to represent a raw form of various schemas.
# In *StandardProtobufReader* user sets Schema Reference reader ({*}existing
ConfluentEncodedSchemaReferenceReader){*}
# In *StandardProtobufReader* user sets *Message Name Resolver Strategy*
property {*}(NEW){*}. Two options are available. One - *Message Name Property*
*(NEW),* Two - *Message Name Resolver Service (NEW)*
## Option one will exist for simple cases where the user knows the name of the
protobuf message upfront, and the message will not change dynamically during
normal work.
## Option two allows the user to choose the controller service responsible for
resolving the message name. In this case, the controller service looks at the
header of the message payload to get the information needed to locate the
Message within the protobuf schema (the information is a message index array,
see protobuf wire format). This is done by
*ConfluentProtobufMessageNameResolver(NEW)* service.
### *Explanation:* Why do we need message name resolving? Protobuf schemas can
contain many message definitions at the root level (see example below). We need
to explicitly give the message name to the deserializer for it to be able to do
its job. A Schema file or schema text alone may not be enough because it may be
ambiguous. The ProtobufReader/2 always needs the name of the proto schema
message to be able to decode the binary payload. The new strategy defines a
new interface *MessageNameResolver* *(NEW)* in
nifi-schema-registry-service-api. Currently, one implementation exists:
*ConfluentProtobufMessageNameResolver* *(NEW)* in
nifi-confluent-platform-bundle.
### The ConfluentProtobufMessageNameResolver looks at the second byte in the
input stream to decode Confluent standardized *message name indexes.* More
about it here: [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are
zigzag encoded varints. *(NEW)*
### *Parsing of the protobuf message schemas:* After
*ConfluentProtobufMessageNameResolver* decodes the message indexes it looks for
the protobuf message name in the *SchemaDefinition (NEW).* Messages can be
nested, so the message index decoded from the content can point to the nested
message like `MessageA.MessageE.MessageG` {{
!image-2025-07-16-12-52-20-296.png! }}
### The *ConfluentMessageNameResolver* needs to parse the schema in order to
know the structure of the schema and locate the right message name *(NEW).* For
this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being downloaded
from the antlr repository at build time. Then the maven plugin generates the
Parser for proto3 schema format.
#### *Why not use the wire library to parse it?* In order to get the schema
message, we need to parse the schema, not compile it. The difference is that we
can parse an incomplete schema as opposed to compilation, where we would need a
schema + imported schemas. Confluent in the implementation of its
schema-registry uses a parser from an internal wire package:
com.squareup.wire.schema.internal.parser.*. The decision was made not to use
internal classes from 3rd party library. I think they are not public anymore in
the newer versions. Additionally, I did not want to add a wire library
dependency and its dependencies to the confluent-bundle because I remembered
the maintainers favor keeping the memory footprint small. ANTLr parser uses a
very small standalone antl-runtime.jar
# The user configures *ConsumeKafka* processor with the processing strategy
set to *RECORD (existing)* and sets the *Record Reader* property to the
instance configured in earlier steps
h1. New components introduced to API in nifi-schema-registry-service-api
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and a
collection of referenced {_}SchemaDefinitions{_}.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ is a new _ControllerService_ that is going to translate
an encoded Protobuf message index into a Protobuf Message name that’s later
going to be used to find a correct Protobuf message for deserialization.
Schema Registry dependent implementations will land in their respective
modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
h2. MessageName
Abstracts value returned by the MessageNameResolver
h1. New components in nifi-protobuf-bundle
h2. *StandardProtobufReader*
Working name. The component will extend an abstract *SchemaRegistryService*
like all Readers do.
It will support the following Schema Access Strategies:
!image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
*Message Name Resolver Strategy* property will have two available allowable
values:
!image-2025-07-17-14-52-18-792.png!
When *Message name property* value gets selected, the component will show a new
property (not visible on screens above) called *Message Name,* where the user
can provide FQN name of the message
When *Message Name Resolver Service* gets selected, the component will show a
new property (not visible on screens above) called *Message Name Resolver
Service,* where a user can provide a reference to the service.
> Support for Confluent schema registry Protobuf wire format in
> StandardProtobufReader
> ------------------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Fix For: 2.6.0
>
> Attachments: image-2025-07-16-12-52-20-296.png,
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png,
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png,
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png,
> image-2025-07-17-14-52-18-792.png
>
> Time Spent: 12h 50m
> Remaining Estimate: 0h
>
> h1. Introduction
> The ticket is split into 2 PRs.
> The fist PR [https://github.com/apache/nifi/pull/10105]
> The second PR https://github.com/apache/nifi/pull/10214
>
> This change enables NIFI to deserialize protobuf messages encoded using the
> Confluent [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> Confluent kafka protobuf serde components use a custom header for the
> message payload. The header needs to be parsed and interpreted. The existing
> NIFI Confluent integration can decode Confluent type headers only for the
> payload containing Avro data. For protobuf data, an additional
> variable-length byte section gets added to the header by the Kafka protobuf
> serializer.
>
> In addition, a new implementation for *ProtobufReader -
> StandardProtobufReader* will be added. The existing ProtobufReader component
> can't be easily extended without breaking backward compatibility because two
> component properties *Proto Directory* and *Message Type* are always
> required. So, even when the component is configured to use a schema registry
> and schema reference reader, the user needs to provide those two parameters.
> It looks like the component was never intended to be used with any other
> *Schema Access Strategy* than {*}Generate From Proto File{*}. Additionally,
> the current implementation can't work in a cluster without copying protobuf
> schemas manually onto every node.
> *StandardProtobufReader* will not support the *Generate From Proto File*
> schema access strategy. It will support the remaining ones from the list
> below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
> by [~exceptionfactory], we decided that it's reasonable to create a new
> implementation of ProtobufReader instead of maintaining the old one. In
> addition, the PR will be divided into two smaller ones. First will cover
> changes to the confluent-bundle and common apis, and the second will cover
> the new ProtobufReader implementation
> With this context, the usecase below uses:
> {color:#de350b}*NEW* - {color}to indicate a feature or a component will be
> added as a result of this ticket.
> {color:#de350b}*EXISTING*{color} - to indicate a feature or a component
> existing in the current NIFI codebase
> h2.
> h1. The use case
> # The user has a schema registry and a Kafka cluster with the topic
> containing messages encoded according to the Confluent [protobuf wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> (Magic byte + 4byte schema_id + varint message indexes + payload)
> # The user configures *StandardProtobufReader* ** service
> # In *StandardProtobufReader* user sets Schema Access Strategy property to
> *Schema Reference Reader (EXISTING)*
> # In *StandardProtobufReader* sets Schema Registry property *(EXISTING)
> ConfluentSchemaRegistry*
> ## *ConfluentSchemaRegistry* was extended to support the fetching of
> referenced/included schemas. If a protobuf schema imports other schemas, they
> are fetched from the registry recursively. *(NEW)*
> ## *SchemaRegistry (EXISTING)* interface was extended to support returning a
> raw schema form ({*}NEW{*}), in addition to the currently returned
> RecordSchema. With this change, a new abstraction was introduced to
> nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of
> this is to represent a raw form of various schemas.
> # In *StandardProtobufReader* user sets Schema Reference reader ({*}existing
> ConfluentEncodedSchemaReferenceReader){*}
> # In *StandardProtobufReader* user sets *Message Name Resolver Strategy*
> property {*}(NEW){*}. Two options are available. One - *Message Name
> Property* *(NEW),* Two - *Message Name Resolver Service (NEW)*
> ## Option one will exist for simple cases where the user knows the name of
> the protobuf message upfront, and the message will not change dynamically
> during normal work.
> ## Option two allows the user to choose the controller service responsible
> for resolving the message name. In this case, the controller service looks at
> the header of the message payload to get the information needed to locate the
> Message within the protobuf schema (the information is a message index array,
> see protobuf wire format). This is done by
> *ConfluentProtobufMessageNameResolver(NEW)* service.
> ### *Explanation:* Why do we need message name resolving? Protobuf schemas
> can contain many message definitions at the root level (see example below).
> We need to explicitly give the message name to the deserializer for it to be
> able to do its job. A Schema file or schema text alone may not be enough
> because it may be ambiguous. The ProtobufReader/2 always needs the name of
> the proto schema message to be able to decode the binary payload. The new
> strategy defines a new interface *MessageNameResolver* *(NEW)* in
> nifi-schema-registry-service-api. Currently, one implementation exists:
> *ConfluentProtobufMessageNameResolver* *(NEW)* in
> nifi-confluent-platform-bundle.
> ### The ConfluentProtobufMessageNameResolver looks at the second byte in the
> input stream to decode Confluent standardized *message name indexes.* More
> about it here: [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are
> zigzag encoded varints. *(NEW)*
> ### *Parsing of the protobuf message schemas:* After
> *ConfluentProtobufMessageNameResolver* decodes the message indexes it looks
> for the protobuf message name in the *SchemaDefinition (NEW).* Messages can
> be nested, so the message index decoded from the content can point to the
> nested message like `MessageA.MessageE.MessageG` {{
> !image-2025-07-16-12-52-20-296.png! }}
> ### The *ConfluentMessageNameResolver* needs to parse the schema in order to
> know the structure of the schema and locate the right message name *(NEW).*
> For this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being
> downloaded from the antlr repository at build time. Then the maven plugin
> generates the Parser for proto3 schema format.
> #### *Why not use the wire library to parse it?* In order to get the schema
> message, we need to parse the schema, not compile it. The difference is that
> we can parse an incomplete schema as opposed to compilation, where we would
> need a schema + imported schemas. Confluent in the implementation of its
> schema-registry uses a parser from an internal wire package:
> com.squareup.wire.schema.internal.parser.*. The decision was made not to use
> internal classes from 3rd party library. I think they are not public anymore
> in the newer versions. Additionally, I did not want to add a wire library
> dependency and its dependencies to the confluent-bundle because I remembered
> the maintainers favor keeping the memory footprint small. ANTLr parser uses a
> very small standalone antl-runtime.jar
> # The user configures *ConsumeKafka* processor with the processing strategy
> set to *RECORD (existing)* and sets the *Record Reader* property to the
> instance configured in earlier steps
> h1. New components introduced to API in nifi-schema-registry-service-api
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and
> a collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ is a new _ControllerService_ that is going to translate
> an encoded Protobuf message index into a Protobuf Message name that’s later
> going to be used to find a correct Protobuf message for deserialization.
> Schema Registry dependent implementations will land in their respective
> modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageName
> Abstracts value returned by the MessageNameResolver
> h1. New components in nifi-protobuf-bundle
> h2. *StandardProtobufReader*
> Working name. The component will extend an abstract *SchemaRegistryService*
> like all Readers do.
> It will support the following Schema Access Strategies:
> !image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
>
> *Message Name Resolver Strategy* property will have two available allowable
> values:
> !image-2025-07-17-14-52-18-792.png!
> When *Message name property* value gets selected, the component will show a
> new property (not visible on screens above) called *Message Name,* where the
> user can provide FQN name of the message
> When *Message Name Resolver Service* gets selected, the component will show a
> new property (not visible on screens above) called *Message Name Resolver
> Service,* where a user can provide a reference to the service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)