nbali commented on issue #21610:
URL: https://github.com/apache/beam/issues/21610#issuecomment-1208754414

   I'm not sure I will have time in the near future to implement this, but 
given how slowly the discussion went I created a working solution for reading 
the whole kafka stream in a batch pipeline. So whoever needs a quicker 
workaround that is even more customizable:
   ```java
   /**
    * Using {@link KafkaIO.Read#withStopReadTime(org.joda.time.Instant)} will 
try to acquire an offset for the given timestamp.<br>
    * If there are only older offsets than the provided timestamp the default 
implementation fails with an exception.<br>
    * This implementation falls back to the newest available offset instead - 
essentially reading till the newest available message.
    */
   @Slf4j
   public class MyKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
       
       public MyKafkaConsumer(Map<String, Object> configs) {
           super(configs);
       }
       
       @Override
       public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) 
{
           final Map<TopicPartition, OffsetAndTimestamp> result = 
super.offsetsForTimes(timestampsToSearch, timeout);
           
           final List<TopicPartition> topicPartitionsWithoutProperOffset =
                   result.keySet().stream()
                           .filter(topicPartition -> result.get(topicPartition) 
== null)
                           .collect(Collectors.toList());
           
           
endOffsets(topicPartitionsWithoutProperOffset).forEach((topicPartition, 
endOffset) -> {
               final Long timestampToSearch = 
timestampsToSearch.get(topicPartition);
               log.warn("Offset for topicPartition: {}, timestamp: {} was not 
found, replaced by endOffset: {}",
                       topicPartition, timestampToSearch, endOffset);
               result.put(topicPartition, new OffsetAndTimestamp(endOffset, 
timestampToSearch));
           });
           
           return result;
       }
       
   }
   ```
   
   ```
   KafkaIO.readBytes()
        .withStopReadTime(Instant.now())
        .withConsumerFactoryFn(MyKafkaConsumer::new) // required for 
stopReadTime
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to