[ 
https://issues.apache.org/jira/browse/NIFI-4380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16843375#comment-16843375
 ] 

Michael commented on NIFI-4380:
-------------------------------

Hello,

Working with Confluent REST Proxy and providing the key there, the key is then 
written with "Confluent Content-Encoded Schema Reference", so to support this 
integration a key reader/writer with schema registry access is needed...

Also the key MUST be in flow file atributes for splitting and routing the FF 
consistently with expected partitioning strategies (same key goes to same 
partition in order of insertion), with no key in attribututes there is no way 
to do it (even not later on with EnforceOrder)

 

Proposing the following approach:
ConsumeKafkaRecord_2_0:
add property - Key Reader (RecordReader) optional
add property - Key Writer (RecordWriter) optional
if set, parse the key with reader, write key with writer, otherwise set 
kafka.key FF attribute to Hex/Base64 encoded byte array
 
FlowFileAttributeWriter (RecordWriter)
property - Record Type - Scalar (for simple type keys) / Hex/Base64 encoded 
byte array / Map
property - Flow File Attributes Prefix (for my case kafka.key)
will take logical record and write as map to flow file attributes
 
PublishKafkaRecord_2_0
add property - Key Reader (RecordReader) optional
add property - Key Writer (RecordWriter) optional
if set, use Key Reader and Key Writer
otherwise use kafka.key FF attribte as Hex/Base64 encoded byte array, if not 
set use Message Key Field simple key (string)
 
FlowFileAttributeReader (recordReader)
property - Record Type - Scalar (for simple type keys) / Hex/Base64 encoded 
byte array / Map
property - Flow File Attributes regex (for my case kafka/.key/..*)
property - Flow File Attribute Level Demarcation (default to .)
will take flow file attributes and parse as logical record map

> Add Kafka Key attribute to outgoing FF for ConsumeKafkaRecord
> -------------------------------------------------------------
>
>                 Key: NIFI-4380
>                 URL: https://issues.apache.org/jira/browse/NIFI-4380
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Andrew Psaltis
>            Assignee: Andrew Psaltis
>            Priority: Major
>
> Often times during processing of data having access to the Key used for the 
> message in Kafka is important. For example, some CDC tools that use Kafka as 
> the destination for changes will use the primary key for the table as the key 
> when writing data to Kafka. The value of this key becomes import after the is 
> consumed from Kakfa, as many times the final destination is another data 
> store, for which you need to know which column(s) of data represents the key.
> When you introduce a Schema Registry into the picture then many times the key 
> is written into Kafka is often encoded, for example, if Avro is being used 
> for data serialization into Kafka then the key may have its own schema that 
> is serialized with. Thus, this change would need to ensure that the key is 
> deserialized before being written as an attribute. 
> Not sure if it gets overly complex or if it is required to have Reader/Writer 
> configuration for the key as well as the value. It is certainly to be 
> expected that is the key and value are stored in Kafka in Avro that the 
> schema for each will be unique. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to