[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lukas Kucharski updated NIFI-14424:
-----------------------------------
Description:
This change enables NIFI to deserialize protobuf messages encoded using the
Confluent [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The use case:
# The user has a schema registry and kafka cluster with the topics containing
messages encoded according to confluent protobuf wire format (Magic byte +
varint message indexes + payload)
# The user configures *ProtobufReader* service ({*}existing{*})
## User sets Schema Access Strategy property to *Schema Reference Reader
(existing)*
### *ConfluentSchemaRegistry* was extended to support the fetching of
referenced schemas. If a protobuf schema imports other schemas, they are
fetched from the registry recursively. *({color:#ff0000}new{color})*
### *SchemaRegistry (existing)* interface was extended to support returning a
raw schema form *({color:#ff0000}new{color})* in addition to the current
RecordSchema. The first design proposed by [~awelless] called for introducing a
{-}*SchemaDefinitionProvider*{-}, but during the implementation, it seemed that
fetching and returning raw schema definition better suited the existing
abstraction of SchemaRegistry, and no new abstraction was introduced to the
API. With this change, a new abstraction was introduced to
nifi-schema-registry-service-api - *SchemaDefinition.* The intention of this is
to represent a raw form of various schemas.
## Sets Schema Registry property *(existing)*
## Sets Schema Reference reader ({*}existing){*}
## Sets *Message Name Resolver Strategy.* Two options are available. One -
*Message Type Property* *({color:#ff0000}NEW{color}),* Two - *Message Name
Resolver Service ({color:#ff0000}NEW{color})*
### Option one exists for backward compatibility and allows users to use
ProtobufReader with Proto Directory and Message Type properties
### Option two allows the user to choose the controller service responsible
for resolving the message name for decoding.
*Explanation:*
The ProtobufReader always needs the name of the proto schema message to be able
to decode the binary payload. In its current form, the Message name is set via
the property *Message Type (existing)*
The new strategy defines the new interface *MessageNameResolver*
*({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently,
one implementation exists: *ConfluentProtobufMessageNameResolver*
*({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
The ConfluentProtobufMessageNameResolver looks at the second byte in the input
stream to decode Confluent standardized *message name indexes.* More about it
here: [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. More details
in confluent wire format document. Indexes are zigzag encoded varints.
*({color:#ff0000}new{color})*
*Parsing of the protobuf message schemas.*
After *ConfluentProtobufMessageNameResolver* decodes the message indexes it
looks for the protobuf message name in the *SchemaDefinition
({color:#ff0000}new{color}).* Messages can be nested, so the message index
decoded from the content can point to the nested message like
`MessageA.MessageE.MessageG`
{{ !image-2025-07-16-12-52-20-296.png! }}
### The *ConfluentMessageNameResolver* need to parse the schema in order to
know the structure of the schema and locate the right message name
*({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 grammar
was downloaded from the antlr repository. Then the maven plugin generates the
Parser for proto3 schema format.
*Why not use the wire library to parse it?*
In order to get the schema message, we need to parse the schema, not compile
it. The difference is that we can parse an incomplete schema as opposed to
compilation, where we would need a schema + imported schemas. Confluent in the
implementation of its schema-registry uses a parser from an internal wire
package:
com.squareup.wire.schema.internal.parser.*
The decision was made not to use internal classes from 3rd party library. I
think they are not public anymore in the newer versions. Additionally, I did
not want to add a wire library dependency and its dependencies to the
confluent-bundle because I remembered the maintainers favor keeping the memory
footprint small. ANTLr parser uses a very small standalone antl-runtime.jar
# The user configures *ConsumeKafka* processor with the processing strategy
set to *RECORD (existing)*
h2. ProtobufReader backward compatibility
.... in progress
Currently for Protobuf record parsing there is a
[ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
which supports proto files on a [local filesystem
only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
The change aims on:
* supporting schema registries to retrieve proto definitions.
* support for decoding protobuf [wire
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
* support fetching imported (referenced) proto files in protobuf schemas
Since presence of original proto files is a must for parsing, a new
abstractions related to schemas will be introduced.
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
collection of referenced {_}SchemaDefinitions{_}.
h2. -SchemaDefinitionProvider-
-New _SchemaDefinitionProviders_ provide a simple interface for retrieving
Schemas as text, together with their references. Unlike their _SchemaRegistry_
counterparts, no parsing is going to take place on that level, and it’s up to
the caller what to do with returned texts.-
-The interface will be put into the _nifi-schema-registry-service-api_ module.
Whereas the implementations should be created in their respective modules, like
_nifi-confluent-schema-registry-service_ for Confluent.-
*Notes after the initial implementation:*
*SchemaDefinitionProvider* was added with an initial implementation (not
merged), but was later removed in favour of having its responsibility added to
the SchemaRegistry implementation. The SchemaRegistry interface was extended
with one method with a default implementation, and there was no need to
introduce new abstractions ( SchemaDefinitionProvider). This allows us to keep
the UX consistent with patterns established so far. Additionally, having the
ability to fetch a raw schema from SchemaRegistry might prove helpful when
additional context, which can be supplied by the original schema (not
RecordSchema), is needed.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ will be a new _ControllerService_ that is going to
translate an encoded Protobuf message index into a Protobuf Message name that’s
later going to be used to find a correct Protobuf message for deserialization.
-Similar to the _SchemaDefinitionProvider,_- the interface will be put into the
_nifi-schema-registry-service-api_ module. Schema Registry dependent
implementations will land into their respective modules. E.g.
_nifi-confluent-schema-registry-service_ for Confluent.
h2. ProtobufReader
-Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
conform to that interface and add a new {_}Schema Access Strategy -
SCHEMA_DEFINITION_PROVIDER{_}.-
-The _createRecordReader_ method will be split in 2 branches. One for
_SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).-
-The _getSupportedPropertyDescriptors_ will be modified to define the new
properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
_SCHEMA_DEFINITION_PROVIDER_ value.-
The existing *ProtobufReader* implementation has two properties ( Proto
Directory, Message Type) that are always required. To keep the backward
compatibility, we will make those two properties optional and add a custom
validator to require them when 'Generate From Protofile' schema access strategy
is selected.
Attribute modifications:
|*Attribute Name*|*Description*|*Value*|*Comment*|
|-Schema Reference Reader- ({*}Note: this property already exists, no
modifications needed){*}|-Service implementation responsible for reading
FlowFile attributes or content to determine the Schema Reference
Identifier.-|-<{_}SchemaReferenceReader{_}> impl-|-That’s going to be the
existing property in SchemaRegistryService.-|
|-Schema Definition Provider-|-A provider that will retrieve Protobuf schema
definitions.-|-<{_}SchemaDefinitionProvider{_}> impl-|-This is a new property,
available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._-|
|Message Name Resolver|Service implementation responsible for reading FlowFile
attributes or content to determine the name of a Protobuf message to use for
deserialization.|<{_}MessageNameResolver{_}> impl|This is a new property,
available only when _Schema Access Strategy_ is
_{-}SCHEMA_DEFINITION_PROVIDER{-}. SCHEMA_REFERENCE_READER_|
|-Proto Directory, Message Type-| | |-*These properties will be hidden when*
*_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*-
-*Currently they’re mandatory.*- |
|_Schema Caching parameters, like size and expiration_| | |These are new
properties, available only when _Schema Access Strategy_ is
_SCHEMA_DEFINITION_PROVIDER._|
h3. -Custom Protobuf Loader-
-The wire’s
[SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to a
file system. Instead of implementing an adapter for a file system, a custom
glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
for loading, or accepts an already fetched SchemaDefinition. See
[CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
for inspiration.-
-This logic will reside in _nifi-protobuf-services_ too.-
was:
This change enables NIFI to deserialize protobuf messages encoded using the
Confluent [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The use case:
# The user has a schema registry and kafka cluster with the topics containing
messages encoded according to confluent protobuf wire format (Magic byte +
varint message indexes + payload)
# The user configures *ProtobufReader* service ({*}existing{*})
## User sets Schema Access Strategy property to *Schema Reference Reader
(existing)*
### *ConfluentSchemaRegistry* was extended to support the fetching of
referenced schemas. If a protobuf schema imports other schemas, they are
fetched from the registry recursively. *({color:#ff0000}new{color})*
### *SchemaRegistry (existing)* interface was extended to support returning a
raw schema form *({color:#ff0000}new{color})* in addition to the current
RecordSchema. The first design proposed by [~awelless] called for introducing a
{-}*SchemaDefinitionProvider*{-}, but during the implementation, it seemed that
fetching and returning raw schema definition better suited the existing
abstraction of SchemaRegistry, and no new abstraction was introduced to the
API. With this change, a new abstraction was introduced to
nifi-schema-registry-service-api - *SchemaDefinition.* The intention of this is
to represent a raw form of various schemas.
## Sets Schema Registry property *(existing)*
## Sets Schema Reference reader ({*}existing){*}
## Sets *Message Name Resolver Strategy.* Two options are available. One -
*Message Type Property* *({color:#ff0000}NEW{color}),* Two - *Message Name
Resolver Service ({color:#ff0000}NEW{color})*
### Option one exists for backward compatibility and allows users to use
ProtobufReader with Proto Directory and Message Type properties
### Option two allows the user to choose the controller service responsible
for resolving the message name for decoding.
*Explanation:*
The ProtobufReader always needs the name of the proto schema message to be able
to decode the binary payload. In its current form, the Message name is set via
the property *Message Type (existing)*
The new strategy defines the new interface *MessageNameResolver*
*({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently,
one implementation exists: *ConfluentProtobufMessageNameResolver*
*({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
The ConfluentProtobufMessageNameResolver looks at the second byte in the input
stream to decode Confluent standardized *message name indexes.* More about it
here: [protobuf wire
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
The indexes are variable length and can be [0][2][0] or [1] etc. More details
in confluent wire format document. Indexes are zigzag encoded varints.
*({color:#ff0000}new{color})*
*Parsing of the protobuf message schemas.*
After *ConfluentProtobufMessageNameResolver* decodes the message indexes it
looks for the protobuf message name in the *SchemaDefinition
({color:#ff0000}new{color}).* Messages can be nested, so the message index
decoded from the content can point to the nested message like
`MessageA.MessageE.MessageG`
{{ !image-2025-07-16-12-52-20-296.png! }}
### The *ConfluentMessageNameResolver* need to parse the schema in order to
know the structure of the schema and locate the right message name
*({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3 grammar
was downloaded from the antlr repository. Then the maven plugin generates the
Parser for proto3 schema format.
*Why not use the wire library to parse it?*
In order to get the schema message, we need to parse the schema, not compile
it. The difference is that we can parse an incomplete schema as opposed to
compilation, where we would need a schema + imported schemas. Confluent in the
implementation of its schema-registry uses a parser from an internal wire
package:
com.squareup.wire.schema.internal.parser.*
The decision was made not to use internal classes from 3rd party library. I
think they are not public anymore in the newer versions. Additionally, I did
not want to add a wire library dependency and its dependencies to the
confluent-bundle because I remembered the maintainers favor keeping the memory
footprint small. ANTLr parser uses a very small standalone antl-runtime.jar
# The user configures *ConsumeKafka* processor with the processing strategy
set to *RECORD (existing)*
Currently for Protobuf record parsing there is a
[ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
which supports proto files on a [local filesystem
only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
The change aims on:
* supporting schema registries to retrieve proto definitions.
* support for decoding protobuf [wire
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
* support fetching imported (referenced) proto files in protobuf schemas
Since presence of original proto files is a must for parsing, a new
abstractions related to schemas will be introduced.
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
collection of referenced {_}SchemaDefinitions{_}.
h2. -SchemaDefinitionProvider-
-New _SchemaDefinitionProviders_ provide a simple interface for retrieving
Schemas as text, together with their references. Unlike their _SchemaRegistry_
counterparts, no parsing is going to take place on that level, and it’s up to
the caller what to do with returned texts.-
-The interface will be put into the _nifi-schema-registry-service-api_ module.
Whereas the implementations should be created in their respective modules, like
_nifi-confluent-schema-registry-service_ for Confluent.-
*Notes after the initial implementation:*
*SchemaDefinitionProvider* was added with an initial implementation (not
merged), but was later removed in favour of having its responsibility added to
the SchemaRegistry implementation. The SchemaRegistry interface was extended
with one method with a default implementation, and there was no need to
introduce new abstractions ( SchemaDefinitionProvider). This allows us to keep
the UX consistent with patterns established so far. Additionally, having the
ability to fetch a raw schema from SchemaRegistry might prove helpful when
additional context, which can be supplied by the original schema (not
RecordSchema), is needed.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ will be a new _ControllerService_ that is going to
translate an encoded Protobuf message index into a Protobuf Message name that’s
later going to be used to find a correct Protobuf message for deserialization.
-Similar to the _SchemaDefinitionProvider,_- the interface will be put into the
_nifi-schema-registry-service-api_ module. Schema Registry dependent
implementations will land into their respective modules. E.g.
_nifi-confluent-schema-registry-service_ for Confluent.
h2. ProtobufReader
-Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
conform to that interface and add a new {_}Schema Access Strategy -
SCHEMA_DEFINITION_PROVIDER{_}.-
-The _createRecordReader_ method will be split in 2 branches. One for
_SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).-
-The _getSupportedPropertyDescriptors_ will be modified to define the new
properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
_SCHEMA_DEFINITION_PROVIDER_ value.-
The existing *ProtobufReader* implementation has two properties ( Proto
Directory, Message Type) that are always required. To keep the backward
compatibility, we will make those two properties optional and add a custom
validator to require them when 'Generate From Protofile' schema access strategy
is selected.
Attribute modifications:
|*Attribute Name*|*Description*|*Value*|*Comment*|
|-Schema Reference Reader- ({*}Note: this property already exists, no
modifications needed){*}|-Service implementation responsible for reading
FlowFile attributes or content to determine the Schema Reference
Identifier.-|-<{_}SchemaReferenceReader{_}> impl-|-That’s going to be the
existing property in SchemaRegistryService.-|
|-Schema Definition Provider-|-A provider that will retrieve Protobuf schema
definitions.-|-<{_}SchemaDefinitionProvider{_}> impl-|-This is a new property,
available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._-|
|Message Name Resolver|Service implementation responsible for reading FlowFile
attributes or content to determine the name of a Protobuf message to use for
deserialization.|<{_}MessageNameResolver{_}> impl|This is a new property,
available only when _Schema Access Strategy_ is
_{-}SCHEMA_DEFINITION_PROVIDER{-}. SCHEMA_REFERENCE_READER_|
|-Proto Directory, Message Type-| | |-*These properties will be hidden when*
*_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*-
-*Currently they’re mandatory.*- |
|_Schema Caching parameters, like size and expiration_| | |These are new
properties, available only when _Schema Access Strategy_ is
_SCHEMA_DEFINITION_PROVIDER._|
h3. -Custom Protobuf Loader-
-The wire’s
[SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to a
file system. Instead of implementing an adapter for a file system, a custom
glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
for loading, or accepts an already fetched SchemaDefinition. See
[CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
for inspiration.-
-This logic will reside in _nifi-protobuf-services_ too.-
> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Attachments: image-2025-07-16-12-52-20-296.png
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> This change enables NIFI to deserialize protobuf messages encoded using the
> Confluent [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The use case:
> # The user has a schema registry and kafka cluster with the topics
> containing messages encoded according to confluent protobuf wire format
> (Magic byte + varint message indexes + payload)
> # The user configures *ProtobufReader* service ({*}existing{*})
> ## User sets Schema Access Strategy property to *Schema Reference Reader
> (existing)*
> ### *ConfluentSchemaRegistry* was extended to support the fetching of
> referenced schemas. If a protobuf schema imports other schemas, they are
> fetched from the registry recursively. *({color:#ff0000}new{color})*
> ### *SchemaRegistry (existing)* interface was extended to support returning
> a raw schema form *({color:#ff0000}new{color})* in addition to the current
> RecordSchema. The first design proposed by [~awelless] called for introducing
> a {-}*SchemaDefinitionProvider*{-}, but during the implementation, it seemed
> that fetching and returning raw schema definition better suited the existing
> abstraction of SchemaRegistry, and no new abstraction was introduced to the
> API. With this change, a new abstraction was introduced to
> nifi-schema-registry-service-api - *SchemaDefinition.* The intention of this
> is to represent a raw form of various schemas.
> ## Sets Schema Registry property *(existing)*
> ## Sets Schema Reference reader ({*}existing){*}
> ## Sets *Message Name Resolver Strategy.* Two options are available. One -
> *Message Type Property* *({color:#ff0000}NEW{color}),* Two - *Message Name
> Resolver Service ({color:#ff0000}NEW{color})*
> ### Option one exists for backward compatibility and allows users to use
> ProtobufReader with Proto Directory and Message Type properties
> ### Option two allows the user to choose the controller service responsible
> for resolving the message name for decoding.
> *Explanation:*
> The ProtobufReader always needs the name of the proto schema message to be
> able to decode the binary payload. In its current form, the Message name is
> set via the property *Message Type (existing)*
> The new strategy defines the new interface *MessageNameResolver*
> *({color:#ff0000}new{color})* in nifi-schema-registry-service-api. Currently,
> one implementation exists: *ConfluentProtobufMessageNameResolver*
> *({color:#ff0000}new{color})* in nifi-confluent-platform-bundle
> The ConfluentProtobufMessageNameResolver looks at the second byte in the
> input stream to decode Confluent standardized *message name indexes.* More
> about it here: [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. More details
> in confluent wire format document. Indexes are zigzag encoded varints.
> *({color:#ff0000}new{color})*
> *Parsing of the protobuf message schemas.*
> After *ConfluentProtobufMessageNameResolver* decodes the message indexes it
> looks for the protobuf message name in the *SchemaDefinition
> ({color:#ff0000}new{color}).* Messages can be nested, so the message index
> decoded from the content can point to the nested message like
> `MessageA.MessageE.MessageG`
> {{ !image-2025-07-16-12-52-20-296.png! }}
> ### The *ConfluentMessageNameResolver* need to parse the schema in order to
> know the structure of the schema and locate the right message name
> *({color:#ff0000}new{color}).* For this *ANTLR* parser is used. Proto3
> grammar was downloaded from the antlr repository. Then the maven plugin
> generates the Parser for proto3 schema format.
> *Why not use the wire library to parse it?*
> In order to get the schema message, we need to parse the schema, not compile
> it. The difference is that we can parse an incomplete schema as opposed to
> compilation, where we would need a schema + imported schemas. Confluent in
> the implementation of its schema-registry uses a parser from an internal wire
> package:
> com.squareup.wire.schema.internal.parser.*
> The decision was made not to use internal classes from 3rd party library. I
> think they are not public anymore in the newer versions. Additionally, I did
> not want to add a wire library dependency and its dependencies to the
> confluent-bundle because I remembered the maintainers favor keeping the
> memory footprint small. ANTLr parser uses a very small standalone
> antl-runtime.jar
> # The user configures *ConsumeKafka* processor with the processing strategy
> set to *RECORD (existing)*
>
> h2. ProtobufReader backward compatibility
> .... in progress
>
>
> Currently for Protobuf record parsing there is a
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
> which supports proto files on a [local filesystem
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>
> The change aims on:
> * supporting schema registries to retrieve proto definitions.
> * support for decoding protobuf [wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
> collection of referenced {_}SchemaDefinitions{_}.
> h2. -SchemaDefinitionProvider-
> -New _SchemaDefinitionProviders_ provide a simple interface for retrieving
> Schemas as text, together with their references. Unlike their
> _SchemaRegistry_ counterparts, no parsing is going to take place on that
> level, and it’s up to the caller what to do with returned texts.-
> -The interface will be put into the _nifi-schema-registry-service-api_
> module. Whereas the implementations should be created in their respective
> modules, like _nifi-confluent-schema-registry-service_ for Confluent.-
> *Notes after the initial implementation:*
> *SchemaDefinitionProvider* was added with an initial implementation (not
> merged), but was later removed in favour of having its responsibility added
> to the SchemaRegistry implementation. The SchemaRegistry interface was
> extended with one method with a default implementation, and there was no need
> to introduce new abstractions ( SchemaDefinitionProvider). This allows us to
> keep the UX consistent with patterns established so far. Additionally, having
> the ability to fetch a raw schema from SchemaRegistry might prove helpful
> when additional context, which can be supplied by the original schema (not
> RecordSchema), is needed.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to
> translate an encoded Protobuf message index into a Protobuf Message name
> that’s later going to be used to find a correct Protobuf message for
> deserialization.
> -Similar to the _SchemaDefinitionProvider,_- the interface will be put into
> the _nifi-schema-registry-service-api_ module. Schema Registry dependent
> implementations will land into their respective modules. E.g.
> _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
> -Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
> conform to that interface and add a new {_}Schema Access Strategy -
> SCHEMA_DEFINITION_PROVIDER{_}.-
> -The _createRecordReader_ method will be split in 2 branches. One for
> _SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).-
> -The _getSupportedPropertyDescriptors_ will be modified to define the new
> properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
> _SCHEMA_DEFINITION_PROVIDER_ value.-
> The existing *ProtobufReader* implementation has two properties ( Proto
> Directory, Message Type) that are always required. To keep the backward
> compatibility, we will make those two properties optional and add a custom
> validator to require them when 'Generate From Protofile' schema access
> strategy is selected.
> Attribute modifications:
>
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |-Schema Reference Reader- ({*}Note: this property already exists, no
> modifications needed){*}|-Service implementation responsible for reading
> FlowFile attributes or content to determine the Schema Reference
> Identifier.-|-<{_}SchemaReferenceReader{_}> impl-|-That’s going to be the
> existing property in SchemaRegistryService.-|
> |-Schema Definition Provider-|-A provider that will retrieve Protobuf schema
> definitions.-|-<{_}SchemaDefinitionProvider{_}> impl-|-This is a new
> property, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._-|
> |Message Name Resolver|Service implementation responsible for reading
> FlowFile attributes or content to determine the name of a Protobuf message to
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new
> property, available only when _Schema Access Strategy_ is
> _{-}SCHEMA_DEFINITION_PROVIDER{-}. SCHEMA_REFERENCE_READER_|
> |-Proto Directory, Message Type-| | |-*These properties will be hidden when*
> *_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*-
> -*Currently they’re mandatory.*- |
> |_Schema Caching parameters, like size and expiration_| | |These are new
> properties, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._|
> h3. -Custom Protobuf Loader-
> -The wire’s
> [SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
> can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to
> a file system. Instead of implementing an adapter for a file system, a custom
> glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
> for loading, or accepts an already fetched SchemaDefinition. See
> [CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
> for inspiration.-
> -This logic will reside in _nifi-protobuf-services_ too.-
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)