[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15797777#comment-15797777
 ] 

ASF GitHub Bot commented on TWILL-199:
--------------------------------------

Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/16#discussion_r94551779
  
    --- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
    @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
         Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
           .MessageCallback() {
           @Override
    -      public void onReceived(Iterator<FetchedMessage> messages) {
    +      public long onReceived(Iterator<FetchedMessage> messages) {
    +        long nextOffset = Long.MIN_VALUE;
    +        while (messages.hasNext()) {
    +          FetchedMessage message = messages.next();
    +          LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
    +          latch.countDown();
    +        }
    +        return nextOffset;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        stopLatch.countDown();
    +      }
    +    });
    +
    +    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
    +    cancel.cancel();
    +    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
    +  }
    +
    +  @Test
    +  public void testKafkaClientReadFromIdx() throws Exception {
    +    String topic = "testClient";
    +
    +    // Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
    +    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
    +    t1.start();
    +    t1.join();
    +    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
    +    t2.start();
    +    t2.join();
    +    Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
    +    t3.start();
    +    t3.join();
    +
    +    final int startIdx = 15;
    +    final CountDownLatch latch = new CountDownLatch(30 - startIdx);
    +    final CountDownLatch stopLatch = new CountDownLatch(1);
    +    final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
    +    // Creater a consumer
    +    final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) 
kafkaClient.getConsumer();
    +    Cancellable initCancel = 
kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
    +      .MessageCallback() {
    +      long minOffset = -2; // earliest msg
    +      long maxOffset = -1; // latest msg
    +      @Override
    +      // Use binary search to find the offset of the message with the 
index matching startIdx. Returns the next offset
    +      // to fetch message until the matching message is found.
    +      public long onReceived(Iterator<FetchedMessage> messages) {
    +        while (messages.hasNext()) {
    +          FetchedMessage currentMsg = messages.next();
    +          long currentOffset = currentMsg.getNextOffset() - 1;
    +          String decodedMsg = 
Charsets.UTF_8.decode(currentMsg.getPayload()).toString();
    +          LOG.info(decodedMsg);
    +          int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]);
    +          LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", 
currentOffset, currentIdx, minOffset);
    +          if (currentIdx == startIdx) {
    +            if (offsetQueue.size() == 0) {
    +              offsetQueue.offer(currentOffset);
    +              LOG.info("currentOffset = {} matches startIdx {}", 
currentOffset, startIdx);
    +            }
    +            return currentOffset;
    +          }
    +          // If minOffset and maxOffset still have their initial values, 
set the minOffset to currentOffset and return
    +          // the offset of the last received message
    +          if (minOffset == -2 && maxOffset == -1) {
    +            minOffset = currentOffset;
    +            LOG.info("minOffset = {}, return maxOffset = {}", minOffset, 
maxOffset);
    +            // Returns the offset of the last received messages. Cannot 
return -1 because -1 will be translated as
    +            // the next offset after the last received message
    +            return consumer.getLastOffset(currentMsg.getTopicPartition(), 
-1) - 1;
    +          }
    +          if (maxOffset == -1) {
    +            maxOffset = currentOffset;
    +          }
    +          LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset);
    +          // If minOffset > maxOffset, the startIdx cannot be found in the 
current range of offset.
    +          // Restore the initial values of minOffset and maxOffset and 
read from the beginning again
    +          if (minOffset > maxOffset) {
    +            minOffset = -2;
    +            maxOffset = -1;
    +            LOG.info("minOffset > maxOffset, return minOffset = {}", 
minOffset);
    +            return minOffset;
    +          }
    +          if (currentIdx > startIdx) {
    +            maxOffset = currentOffset - 1;
    +            long newOffset = minOffset + (maxOffset - minOffset)/2;
    +            LOG.info("currentIdx > startIdx, return newOffset {}", 
newOffset);
    +            return newOffset;
    +          }
    +          if (currentIdx < startIdx) {
    +            minOffset = currentOffset + 1;
    +            long newOffset = minOffset + (maxOffset - minOffset)/2;
    +            LOG.info("currentIdx < startIdx, return newOffset {}", 
newOffset);
    +            return newOffset;
    +          }
    +        }
    +        return Long.MIN_VALUE;
    +      }
    +
    +      @Override
    +      public void finished() {
    +        //no-op
    +      }
    +    });
    +
    +    long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS);
    +    initCancel.cancel();
    +
    +    Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
startOffset).consume(new KafkaConsumer
    +      .MessageCallback() {
    --- End diff --
    
    This is an awkward line break.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> ------------------------------------------------------------------------
>
>                 Key: TWILL-199
>                 URL: https://issues.apache.org/jira/browse/TWILL-199
>             Project: Apache Twill
>          Issue Type: Improvement
>            Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator<FetchedMessage> messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to