[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lukas Kucharski updated NIFI-14424:
-----------------------------------
    Summary: Support for Confluent schema registry Protobuf wire format in 
ProtobufReader  (was: Support for Confluent schema registry Protobuf format in 
ProtobufReader)

> Support for Confluent schema registry Protobuf wire format in ProtobufReader
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently for Protobuf record parsing there is a 
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
>  which supports proto files on a [local filesystem 
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
>  
> The change aims on:
>  * supporting schema registries to retrieve proto definitions. 
>  * support for decoding protobuf [wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  * support fetching imported (referenced) proto files in protobuf schemas
> Since presence of original proto files is a must for parsing, a new 
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
> collection of referenced {_}SchemaDefinitions{_}.
> h2. -SchemaDefinitionProvider-
> -New _SchemaDefinitionProviders_ provide a simple interface for retrieving 
> Schemas as text, together with their references. Unlike their 
> _SchemaRegistry_ counterparts, no parsing is going to take place on that 
> level, and it’s up to the caller what to do with returned texts.-
> -The interface will be put into the _nifi-schema-registry-service-api_ 
> module. Whereas the implementations should be created in their respective 
> modules, like _nifi-confluent-schema-registry-service_ for Confluent.-
> *Notes after the initial implementation:*
> *SchemaDefinitionProvider* was added with an initial implementation (not 
> merged), but was later removed in favour of having its responsibility added 
> to the SchemaRegistry implementation. The SchemaRegistry interface was 
> extended with one method with a default implementation, and there was no need 
> to introduce new abstractions ( SchemaDefinitionProvider). This allows us to 
> keep the UX consistent with patterns established so far. Additionally, having 
> the ability to fetch a raw schema from SchemaRegistry might prove helpful 
> when additional context, which can be supplied by the original schema (not 
> RecordSchema), is needed.   
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to 
> translate an encoded Protobuf message index into a Protobuf Message name 
> that’s later going to be used to find a correct Protobuf message for 
> deserialization.
> -Similar to the _SchemaDefinitionProvider,_- the interface will be put into 
> the _nifi-schema-registry-service-api_ module. Schema Registry dependent 
> implementations will land into their respective modules. E.g. 
> _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
> -Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll 
> conform to that interface and add a new {_}Schema Access Strategy - 
> SCHEMA_DEFINITION_PROVIDER{_}.-
> -The _createRecordReader_ method will be split in 2 branches. One for 
> _SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).- 
> -The _getSupportedPropertyDescriptors_ will be modified to define the new 
> properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on 
> _SCHEMA_DEFINITION_PROVIDER_ value.-
> The existing *ProtobufReader* implementation has two properties ( Proto 
> Directory, Message Type) that are always required. To keep the backward 
> compatibility, we will make those two properties optional and add a custom 
> validator to require them when 'Generate From Protofile' schema access 
> strategy is selected. 
> Attribute modifications:
>  
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |-Schema Reference Reader- ({*}Note: this property already exists, no 
> modifications needed){*}|-Service implementation responsible for reading 
> FlowFile attributes or content to determine the Schema Reference 
> Identifier.-|-<{_}SchemaReferenceReader{_}> impl-|-That’s going to be the 
> existing property in SchemaRegistryService.-|
> |-Schema Definition Provider-|-A provider that will retrieve Protobuf schema 
> definitions.-|-<{_}SchemaDefinitionProvider{_}> impl-|-This is a new 
> property, available only when _Schema Access Strategy_ is 
> _SCHEMA_DEFINITION_PROVIDER._-|
> |Message Name Resolver|Service implementation responsible for reading 
> FlowFile attributes or content to determine the name of a Protobuf message to 
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new 
> property, available only when _Schema Access Strategy_ is 
> _{-}SCHEMA_DEFINITION_PROVIDER{-}. SCHEMA_REFERENCE_READER_|
> |-Proto Directory, Message Type-| | |-*These properties will be hidden when* 
> *_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*-
> -*Currently they’re mandatory.*- |
> |_Schema Caching parameters, like size and expiration_| | |These are new 
> properties, available only when _Schema Access Strategy_ is 
> _SCHEMA_DEFINITION_PROVIDER._|
> h3. -Custom Protobuf Loader-
> -The wire’s 
> [SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
>  can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to 
> a file system. Instead of implementing an adapter for a file system, a custom 
> glue-code will be implemented, which either uses a _SchemaDefinitionProvider_ 
> for loading, or accepts an already fetched SchemaDefinition. See 
> [CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
>  for inspiration.-
> -This logic will reside in _nifi-protobuf-services_ too.-
> h2. Appendix: Code examples
> h3. -SchemaDefinitionProvider-
> -{{record SchemaDefinition(}}-
>   -{{SchemaIdentifier id,}}-
>   -{{String text,}}-      
>   -{{Map<String, SchemaDefinition> references // key - by what name the 
> schema is referenced from its parent}}-
> -{{)}}-
> -{{interface SchemaDefinitionProvider {}}-
>   -{{SchemaDefinition getById(SchemaIdentifier id);}}-
> -{{}}}-
> -{{class ConfluentSchemaDefinitionProvider implements 
> SchemaDefinitionProvider {}}-
>   -{{// ...}}-
> -{{}}}-
> h3. MessageNameResolver
> {{record Message(}}
> {{  String name;}}
> {{  List<Message> nestedMessages;}}
> {{)}}
> {{interface MessageNameResolver {}}
> {{  String getMessageName(InputStream contentStream, List<Message> 
> messages);}}
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to