[
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=91838&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91838
]
ASF GitHub Bot logged work on BEAM-4038:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Apr/18 17:21
Start Date: 17/Apr/18 17:21
Worklog Time Spent: 10m
Work Description: rangadi commented on a change in pull request #5111:
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182156839
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
##########
@@ -139,4 +158,14 @@ public long offsetForTime(Consumer<?, ?> consumer,
TopicPartition topicPartition
return offsetAndTimestamp.offset();
}
}
+
+ public Headers getHeaders(ConsumerRecord<byte[], byte[]> rawRecord) {
+ Headers recordHeaders = new RecordHeaders();
Review comment:
Need to avoid accessing anything related to `Headers` when 'hasHeaders' is
false so that it does not cause runtime exception.
I would implement this method something like (could move the method to
`KafkaRecord` as well) :
```
if (hasHeaders) {
return rawRecord.headers; // No need to covert to RecordHeader (note
that RecordHeader is an internal class).
} else {
throw new RuntimeException("The version kafka-clients does not support
record headers, please use version 0.11.0.0 or newer".).
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 91838)
Time Spent: 2h 50m (was: 2h 40m)
> Support Kafka Headers in KafkaIO
> --------------------------------
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
> Issue Type: New Feature
> Components: io-java-kafka
> Reporter: Geet Kumar
> Assignee: Raghu Angadi
> Priority: Minor
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The
> purpose of this JIRA is to support this feature in KafkaIO.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)