[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lukas Kucharski updated NIFI-14424:
-----------------------------------
Description:
h2. Summary
Add Confluent Protobuf Wire Format Support - Foundation Services and APIs
h2. Description
This enhancement introduces foundational support for deserializing Protobuf
messages encoded using the Confluent protobuf wire format in NiFi. The existing
Confluent Schema Registry integration only supported Avro data decoding.
Confluent's Protobuf serialization adds a variable-length byte section to the
message header containing message indexes that must be parsed and interpreted.
This ticket represents Phase 1 of the implementation, providing the core
services and API extensions. Phase 2 will introduce the new
StandardProtobufReader component.
h2. Key Components Implemented
h3. API Extensions (nifi-schema-registry-service-api)
- {*}SchemaDefinition{*}: New abstraction representing raw schema forms with
schema text and referenced schema collections
- {*}MessageNameResolver{*}: Controller service interface for translating
encoded Protobuf message indexes into message names
- {*}MessageName{*}: Abstraction for values returned by MessageNameResolver
h3. Confluent Platform Bundle Enhancements
- {*}ConfluentProtobufMessageNameResolver{*}: Service that decodes
Confluent-standardized message name indexes from wire format using zigzag
encoded varints
- Enhanced {*}RestSchemaRegistryClient{*}: Extended to support recursive
fetching of referenced/imported Protobuf schemas
- {*}ProtobufMessageSchema{*}: Schema representation supporting nested
message parsing
- ANTLR Parser Integration: Added Proto3 grammar parser for schema structure
analysis without requiring full compilation
h3. Wire Format Processing
- Message Index Decoding: Parses variable-length message indexes (e.g.,
[0][2][0] or [1]) from Confluent wire format headers
- Nested Message Support: Resolves complex message paths like
MessageA.MessageE.MessageG
- Schema Structure Analysis: Parses Protobuf schemas to locate correct
message definitions for deserialization
h2. Next Phase
Phase 2 (linked issue) will implement *StandardProtobufReader* component that
utilizes these foundational services to provide full Protobuf record processing
capabilities with Schema Registry integration.
was:
h1. Introduction
The ticket is split into 2 PRs.
The fist PR [https://github.com/apache/nifi/pull/10105]
The second PR https://github.com/apache/nifi/pull/10214
This change enables NIFI to deserialize protobuf messages encoded using the
Confluent [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
Confluent kafka protobuf serde components use a custom header for the message
payload. The header needs to be parsed and interpreted. The existing NIFI
Confluent integration can decode Confluent type headers only for the payload
containing Avro data. For protobuf data, an additional variable-length byte
section gets added to the header by the Kafka protobuf serializer.
In addition, a new implementation for *ProtobufReader - StandardProtobufReader*
will be added. The existing ProtobufReader component can't be easily extended
without breaking backward compatibility because two component properties *Proto
Directory* and *Message Type* are always required. So, even when the component
is configured to use a schema registry and schema reference reader, the user
needs to provide those two parameters. It looks like the component was never
intended to be used with any other *Schema Access Strategy* than {*}Generate
From Proto File{*}. Additionally, the current implementation can't work in a
cluster without copying protobuf schemas manually onto every node.
*StandardProtobufReader* will not support the *Generate From Proto File* schema
access strategy. It will support the remaining ones from the list below:
!image-2025-07-17-10-32-37-291.png!
h1. History
Initially, the changes have been pushed for review in [this
pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
by [~exceptionfactory], we decided that it's reasonable to create a new
implementation of ProtobufReader instead of maintaining the old one. In
addition, the PR will be divided into two smaller ones. First will cover
changes to the confluent-bundle and common apis, and the second will cover the
new ProtobufReader implementation
With this context, the usecase below uses:
{color:#de350b}*NEW* - {color}to indicate a feature or a component will be
added as a result of this ticket.
{color:#de350b}*EXISTING*{color} - to indicate a feature or a component
existing in the current NIFI codebase
h2.
h1. The use case
# The user has a schema registry and a Kafka cluster with the topic containing
messages encoded according to the Confluent [protobuf wire
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
(Magic byte + 4byte schema_id + varint message indexes + payload)
# The user configures *StandardProtobufReader* ** service
# In *StandardProtobufReader* user sets Schema Access Strategy property to
*Schema Reference Reader (EXISTING)*
# In *StandardProtobufReader* sets Schema Registry property *(EXISTING)
ConfluentSchemaRegistry*
## *ConfluentSchemaRegistry* was extended to support the fetching of
referenced/included schemas. If a protobuf schema imports other schemas, they
are fetched from the registry recursively. *(NEW)*
## *SchemaRegistry (EXISTING)* interface was extended to support returning a
raw schema form ({*}NEW{*}), in addition to the currently returned
RecordSchema. With this change, a new abstraction was introduced to
nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of
this is to represent a raw form of various schemas.
# In *StandardProtobufReader* user sets Schema Reference reader ({*}existing
ConfluentEncodedSchemaReferenceReader){*}
# In *StandardProtobufReader* user sets *Message Name Resolver Strategy*
property {*}(NEW){*}. Two options are available. One - *Message Name Property*
*(NEW),* Two - *Message Name Resolver Service (NEW)*
## Option one will exist for simple cases where the user knows the name of the
protobuf message upfront, and the message will not change dynamically during
normal work.
## Option two allows the user to choose the controller service responsible for
resolving the message name. In this case, the controller service looks at the
header of the message payload to get the information needed to locate the
Message within the protobuf schema (the information is a message index array,
see protobuf wire format). This is done by
*ConfluentProtobufMessageNameResolver(NEW)* service.
### *Explanation:* Why do we need message name resolving? Protobuf schemas can
contain many message definitions at the root level (see example below). We need
to explicitly give the message name to the deserializer for it to be able to do
its job. A Schema file or schema text alone may not be enough because it may be
ambiguous. The ProtobufReader/2 always needs the name of the proto schema
message to be able to decode the binary payload. The new strategy defines a
new interface *MessageNameResolver* *(NEW)* in
nifi-schema-registry-service-api. Currently, one implementation exists:
*ConfluentProtobufMessageNameResolver* *(NEW)* in
nifi-confluent-platform-bundle.
### The ConfluentProtobufMessageNameResolver looks at the second byte in the
input stream to decode Confluent standardized *message name indexes.* More
about it here: [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are
zigzag encoded varints. *(NEW)*
### *Parsing of the protobuf message schemas:* After
*ConfluentProtobufMessageNameResolver* decodes the message indexes it looks for
the protobuf message name in the *SchemaDefinition (NEW).* Messages can be
nested, so the message index decoded from the content can point to the nested
message like `MessageA.MessageE.MessageG` {{
!image-2025-07-16-12-52-20-296.png! }}
### The *ConfluentMessageNameResolver* needs to parse the schema in order to
know the structure of the schema and locate the right message name *(NEW).* For
this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being downloaded
from the antlr repository at build time. Then the maven plugin generates the
Parser for proto3 schema format.
#### *Why not use the wire library to parse it?* In order to get the schema
message, we need to parse the schema, not compile it. The difference is that we
can parse an incomplete schema as opposed to compilation, where we would need a
schema + imported schemas. Confluent in the implementation of its
schema-registry uses a parser from an internal wire package:
com.squareup.wire.schema.internal.parser.*. The decision was made not to use
internal classes from 3rd party library. I think they are not public anymore in
the newer versions. Additionally, I did not want to add a wire library
dependency and its dependencies to the confluent-bundle because I remembered
the maintainers favor keeping the memory footprint small. ANTLr parser uses a
very small standalone antl-runtime.jar
# The user configures *ConsumeKafka* processor with the processing strategy
set to *RECORD (existing)* and sets the *Record Reader* property to the
instance configured in earlier steps
h1. New components introduced to API in nifi-schema-registry-service-api
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and a
collection of referenced {_}SchemaDefinitions{_}.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ is a new _ControllerService_ that is going to translate
an encoded Protobuf message index into a Protobuf Message name that’s later
going to be used to find a correct Protobuf message for deserialization.
Schema Registry dependent implementations will land in their respective
modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
h2. MessageName
Abstracts value returned by the MessageNameResolver
h1. New components in nifi-protobuf-bundle
h2. *StandardProtobufReader*
Working name. The component will extend an abstract *SchemaRegistryService*
like all Readers do.
It will support the following Schema Access Strategies:
!image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
*Message Name Resolver Strategy* property will have two available allowable
values:
!image-2025-07-17-14-52-18-792.png!
When *Message name property* value gets selected, the component will show a new
property (not visible on screens above) called *Message Name,* where the user
can provide FQN name of the message
When *Message Name Resolver Service* gets selected, the component will show a
new property (not visible on screens above) called *Message Name Resolver
Service,* where a user can provide a reference to the service.
> Support for Confluent schema registry Protobuf wire format in
> StandardProtobufReader
> ------------------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Attachments: image-2025-07-16-12-52-20-296.png,
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png,
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png,
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png,
> image-2025-07-17-14-52-18-792.png
>
> Time Spent: 13h 20m
> Remaining Estimate: 0h
>
> h2. Summary
> Add Confluent Protobuf Wire Format Support - Foundation Services and APIs
> h2. Description
> This enhancement introduces foundational support for deserializing Protobuf
> messages encoded using the Confluent protobuf wire format in NiFi. The
> existing Confluent Schema Registry integration only supported Avro data
> decoding. Confluent's Protobuf serialization adds a variable-length byte
> section to the message header containing message indexes that must be parsed
> and interpreted.
> This ticket represents Phase 1 of the implementation, providing the core
> services and API extensions. Phase 2 will introduce the new
> StandardProtobufReader component.
> h2. Key Components Implemented
> h3. API Extensions (nifi-schema-registry-service-api)
> - {*}SchemaDefinition{*}: New abstraction representing raw schema forms
> with schema text and referenced schema collections
> - {*}MessageNameResolver{*}: Controller service interface for translating
> encoded Protobuf message indexes into message names
> - {*}MessageName{*}: Abstraction for values returned by MessageNameResolver
> h3. Confluent Platform Bundle Enhancements
> - {*}ConfluentProtobufMessageNameResolver{*}: Service that decodes
> Confluent-standardized message name indexes from wire format using zigzag
> encoded varints
> - Enhanced {*}RestSchemaRegistryClient{*}: Extended to support recursive
> fetching of referenced/imported Protobuf schemas
> - {*}ProtobufMessageSchema{*}: Schema representation supporting nested
> message parsing
> - ANTLR Parser Integration: Added Proto3 grammar parser for schema
> structure analysis without requiring full compilation
> h3. Wire Format Processing
> - Message Index Decoding: Parses variable-length message indexes (e.g.,
> [0][2][0] or [1]) from Confluent wire format headers
> - Nested Message Support: Resolves complex message paths like
> MessageA.MessageE.MessageG
> - Schema Structure Analysis: Parses Protobuf schemas to locate correct
> message definitions for deserialization
> h2. Next Phase
> Phase 2 (linked issue) will implement *StandardProtobufReader* component
> that utilizes these foundational services to provide full Protobuf record
> processing capabilities with Schema Registry integration.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)