nbali commented on issue #21610:
URL: https://github.com/apache/beam/issues/21610#issuecomment-1208754414
I'm not sure I will have time in the near future to implement this, but
given how slowly the discussion went I created a working solution for reading
the whole kafka stream in a batch pipeline. So whoever needs a quicker
workaround that is even more customizable:
```java
/**
* Using {@link KafkaIO.Read#withStopReadTime(org.joda.time.Instant)} will
try to acquire an offset for the given timestamp.<br>
* If there are only older offsets than the provided timestamp the default
implementation fails with an exception.<br>
* This implementation falls back to the newest available offset instead -
essentially reading till the newest available message.
*/
@Slf4j
public class MyKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
public MyKafkaConsumer(Map<String, Object> configs) {
super(configs);
}
@Override
public Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)
{
final Map<TopicPartition, OffsetAndTimestamp> result =
super.offsetsForTimes(timestampsToSearch, timeout);
final List<TopicPartition> topicPartitionsWithoutProperOffset =
result.keySet().stream()
.filter(topicPartition -> result.get(topicPartition)
== null)
.collect(Collectors.toList());
endOffsets(topicPartitionsWithoutProperOffset).forEach((topicPartition,
endOffset) -> {
final Long timestampToSearch =
timestampsToSearch.get(topicPartition);
log.warn("Offset for topicPartition: {}, timestamp: {} was not
found, replaced by endOffset: {}",
topicPartition, timestampToSearch, endOffset);
result.put(topicPartition, new OffsetAndTimestamp(endOffset,
timestampToSearch));
});
return result;
}
}
```
```
KafkaIO.readBytes()
.withStopReadTime(Instant.now())
.withConsumerFactoryFn(MyKafkaConsumer::new) // required for
stopReadTime
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]