[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alaksiej Ščarbaty updated NIFI-14424:
-------------------------------------
Description:
Currently for Protobuf record parsing there is a
[ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
which currently supports proto files on a [local filesystem
only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
The change aims on supporting schema registries to retrieve proto definitions.
Since presence of original proto files is a must for parsing, a new
abstractions related to schemas will be introduced.
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
collection of referenced {_}SchemaDefinitions{_}.
h2. SchemaDefinitionProvider
New _SchemaDefinitionProviders_ provide a simple interface for retrieving
Schemas as text, together with their references. Unlike their _SchemaRegistry_
counterparts, no parsing is going to take place on that level, and it’s up to
the caller what to do with returned texts.
The interface will be put into the _nifi-schema-registry-service-api_ module.
Whereas the implementations should be created in their respective modules, like
_nifi-confluent-schema-registry-service_ for Confluent.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ will be a new _ControllerService_ that is going to
translate an encoded Protobuf message index into a Protobuf Message name that’s
later going to be used to find a correct Protobuf message for deserialization.
Similar to the _SchemaDefinitionProvider,_ the interface will be put into the
_nifi-schema-registry-service-api_ module. Schema Registry dependent
implementations will land into their respective modules. E.g.
_nifi-confluent-schema-registry-service_ for Confluent.
h2. ProtobufReader
Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
conform to that interface and add a new {_}Schema Access Strategy -
SCHEMA_DEFINITION_PROVIDER{_}.
The _createRecordReader_ method will be split in 2 branches. One for
_SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).
The _getSupportedPropertyDescriptors_ will be modified to define the new
properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
_SCHEMA_DEFINITION_PROVIDER_ value.
Attribute modifications:
|*Attribute Name*|*Description*|*Value*|*Comment*|
|Schema Reference Reader|Service implementation responsible for reading
FlowFile attributes or content to determine the Schema Reference
Identifier.|<{_}SchemaReferenceReader{_}> impl|That’s going to be the existing
property in SchemaRegistryService.|
|Schema Definition Provider|A provider that will retrieve Protobuf schema
definitions.|<{_}SchemaDefinitionProvider{_}> impl|This is a new property,
available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
|Message Name Resolver|Service implementation responsible for reading FlowFile
attributes or content to determine the name of a Protobuf message to use for
deserialization.|<{_}MessageNameResolver{_}> impl|This is a new property,
available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
|Proto Directory, Message Type| | |*These properties will be hidden when*
*_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*
*Currently they’re mandatory.* |
|_Schema Caching parameters, like size and expiration_| | |These are new
properties, available only when _Schema Access Strategy_ is
_SCHEMA_DEFINITION_PROVIDER._|
h3. Custom Protobuf Loader
The wire’s
[SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to a
file system. Instead of implementing an adapter for a file system, a custom
glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
for loading, or accepts an already fetched SchemaDefinition. See
[CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
for inspiration.
This logic will reside in _nifi-protobuf-services_ too.
h2. Appendix: Code examples
h3. SchemaDefinitionProvider
{{record SchemaDefinition(}}
{{ SchemaIdentifier id,}}
{{ String text, }}
{{ Map<String, SchemaDefinition> references // key - by what name the schema
is referenced from its parent}}
{{)}}
{{interface SchemaDefinitionProvider {}}
{{ SchemaDefinition getById(SchemaIdentifier id);}}
{{}}}
{{class ConfluentSchemaDefinitionProvider implements SchemaDefinitionProvider
{}}
{{ // ...}}
{{}}}
h3. MessageNameResolver
{{record Message(}}
{{ String name;}}
{{ List<Message> nestedMessages;}}
{{)}}
{{interface MessageNameResolver {}}
{{ String getMessageName(InputStream contentStream, List<Message> messages);}}
{{{{}}{}}}}
was:
Currently for Protobuf record parsing there is a
[ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
which currently supports proto files on a [local filesystem
only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
The change aims on supporting schema registries to retrieve proto definitions.
Since presence of original proto files is a must for parsing, a new
abstractions related to schemas will be introduced.
h2. SchemaDefinition
A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
collection of referenced {_}SchemaDefinitions{_}.
h2. SchemaDefinitionProvider
New _SchemaDefinitionProviders_ provide a simple interface for retrieving
Schemas as text, together with their references. Unlike their _SchemaRegistry_
counterparts, no parsing is going to take place on that level, and it’s up to
the caller what to do with returned texts.
The interface will be put into the _nifi-schema-registry-service-api_ module.
Whereas the implementations should be created in their respective modules, like
_nifi-confluent-schema-registry-service_ for Confluent.
h2. MessageNameResolver
In the case of Protobuf a single proto file may contain multiple message
entries. To understand what message to use for decoding, a message index is
usually encoded in the payload. And as with a schema reference, different
systems encode this index differently.
_MessageNameResolver_ will be a new _ControllerService_ that is going to
translate an encoded Protobuf message index into a Protobuf Message name that’s
later going to be used to find a correct Protobuf message for deserialization.
Similar to the _SchemaDefinitionProvider,_ the interface will be put into the
_nifi-schema-registry-service-api_ module. Schema Registry dependent
implementations will land into their respective modules. E.g.
_nifi-confluent-schema-registry-service_ for Confluent.
h2. ProtobufReader
Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
conform to that interface and add a new {_}Schema Access Strategy -
SCHEMA_DEFINITION_PROVIDER{_}.
The _createRecordReader_ method will be split in 2 branches. One for
_SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).
The _getSupportedPropertyDescriptors_ will be modified to define the new
properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
_SCHEMA_DEFINITION_PROVIDER_ value.
Attribute modifications:
|*Attribute Name*|*Description*|*Value*|*Comment*|
|Schema Reference Reader|Service implementation responsible for reading
FlowFile attributes or content to determine the Schema Reference
Identifier.|<{_}SchemaReferenceReader{_}> impl|That’s going to be the existing
property in SchemaRegistryService.|
|Schema Definition Provider|A provider that will retrieve Protobuf schema
definitions.|<{_}SchemaDefinitionProvider{_}> impl|This is a new property,
available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
|Message Name Resolver|Service implementation responsible for reading FlowFile
attributes or content to determine the name of a Protobuf message to use for
deserialization.|<{_}MessageNameResolver{_}> impl|This is a new property,
available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
|Proto Directory, Message Type| | |*These properties will be hidden when*
*_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*
*Currently they’re mandatory.* |
|_Schema Caching parameters, like size and expiration_| | |These are new
properties, available only when _Schema Access Strategy_ is
_SCHEMA_DEFINITION_PROVIDER._|
h3. Custom Protobuf Loader
The wire’s
[SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to a
file system. Instead of implementing an adapter for a file system, a custom
glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
for loading, or accepts an already fetched SchemaDefinition. See
[CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
for inspiration.
This logic will reside in _nifi-protobuf-services_ too.
h2. Appendix: Code examples
h3. SchemaDefinitionProvider
{{{{record SchemaDefinition(}}}}
{{ SchemaIdentifier id,}}
{{ String text, }}
{{ Map<String, SchemaDefinition> references // key - by what name the schema
is referenced from its parent}}
{{){}}}
{{{{interface SchemaDefinitionProvider {}}}}
{{ SchemaDefinition getById(SchemaIdentifier id);}}
{{}}}
{{{{class ConfluentSchemaDefinitionProvider implements SchemaDefinitionProvider
{}}}}
{{ // ...}}
{{}}}
h3. MessageNameResolver
{{record Message(}}
{{ String name;}}
{{ List<Message> nestedMessages;}}
{{){}}}
{{interface MessageNameResolver {}}
{{ String getMessageName(InputStream contentStream, List<Message> messages);}}
{{{{}}}}}
> Support schema registries in ProtobufReader
> -------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Alaksiej Ščarbaty
> Priority: Major
>
> Currently for Protobuf record parsing there is a
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
> which currently supports proto files on a [local filesystem
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
> The change aims on supporting schema registries to retrieve proto
> definitions.
> Since presence of original proto files is a must for parsing, a new
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
> collection of referenced {_}SchemaDefinitions{_}.
> h2. SchemaDefinitionProvider
> New _SchemaDefinitionProviders_ provide a simple interface for retrieving
> Schemas as text, together with their references. Unlike their
> _SchemaRegistry_ counterparts, no parsing is going to take place on that
> level, and it’s up to the caller what to do with returned texts.
> The interface will be put into the _nifi-schema-registry-service-api_ module.
> Whereas the implementations should be created in their respective modules,
> like _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to
> translate an encoded Protobuf message index into a Protobuf Message name
> that’s later going to be used to find a correct Protobuf message for
> deserialization.
> Similar to the _SchemaDefinitionProvider,_ the interface will be put into the
> _nifi-schema-registry-service-api_ module. Schema Registry dependent
> implementations will land into their respective modules. E.g.
> _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
> Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
> conform to that interface and add a new {_}Schema Access Strategy -
> SCHEMA_DEFINITION_PROVIDER{_}.
> The _createRecordReader_ method will be split in 2 branches. One for
> _SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).
> The _getSupportedPropertyDescriptors_ will be modified to define the new
> properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
> _SCHEMA_DEFINITION_PROVIDER_ value.
> Attribute modifications:
>
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Schema Reference Reader|Service implementation responsible for reading
> FlowFile attributes or content to determine the Schema Reference
> Identifier.|<{_}SchemaReferenceReader{_}> impl|That’s going to be the
> existing property in SchemaRegistryService.|
> |Schema Definition Provider|A provider that will retrieve Protobuf schema
> definitions.|<{_}SchemaDefinitionProvider{_}> impl|This is a new property,
> available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
> |Message Name Resolver|Service implementation responsible for reading
> FlowFile attributes or content to determine the name of a Protobuf message to
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new
> property, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._|
> |Proto Directory, Message Type| | |*These properties will be hidden when*
> *_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*
> *Currently they’re mandatory.* |
> |_Schema Caching parameters, like size and expiration_| | |These are new
> properties, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._|
> h3. Custom Protobuf Loader
> The wire’s
> [SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
> can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to
> a file system. Instead of implementing an adapter for a file system, a custom
> glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
> for loading, or accepts an already fetched SchemaDefinition. See
> [CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
> for inspiration.
> This logic will reside in _nifi-protobuf-services_ too.
> h2. Appendix: Code examples
> h3. SchemaDefinitionProvider
> {{record SchemaDefinition(}}
> {{ SchemaIdentifier id,}}
> {{ String text, }}
> {{ Map<String, SchemaDefinition> references // key - by what name the schema
> is referenced from its parent}}
> {{)}}
> {{interface SchemaDefinitionProvider {}}
> {{ SchemaDefinition getById(SchemaIdentifier id);}}
> {{}}}
> {{class ConfluentSchemaDefinitionProvider implements SchemaDefinitionProvider
> {}}
> {{ // ...}}
> {{}}}
> h3. MessageNameResolver
> {{record Message(}}
> {{ String name;}}
> {{ List<Message> nestedMessages;}}
> {{)}}
> {{interface MessageNameResolver {}}
> {{ String getMessageName(InputStream contentStream, List<Message>
> messages);}}
> {{{{}}{}}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)