## What is the purpose of the change
This changes allows to access Kafka headers in KeyedDeserializationSchema, and 
to provide Kafka headers via KeyedSerializationSchema.headers method

## Brief change log
- Add default _headers_ method to KeyedSerializationSchema interface
- Add default deserialize method with _headers_ argument to 
KeyedDeserializationSchema
- Add Kafka011Fetcher to get headers from Kafka ConsumerRecord and provide them 
as parameter KeyedDeserializationSchema
- Modify FlinkKafkaProducer011 to request _headers_ from 
KeyedSerializationSchema and add them to ProducerRecord.

## Verifying this change

This change added tests and can be verified as follows:
  - *Added integration test *Kafka011ITCase.testHeaders* to verify that we can 
produce headers and consume them

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)

[ Full content available at: https://github.com/apache/flink/pull/6615 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to