[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15797777#comment-15797777 ]
ASF GitHub Bot commented on TWILL-199: -------------------------------------- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551779 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override - public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { + String topic = "testClient"; + + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final int startIdx = 15; + final CountDownLatch latch = new CountDownLatch(30 - startIdx); + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + // Creater a consumer + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset + // to fetch message until the matching message is found. + public long onReceived(Iterator<FetchedMessage> messages) { + while (messages.hasNext()) { + FetchedMessage currentMsg = messages.next(); + long currentOffset = currentMsg.getNextOffset() - 1; + String decodedMsg = Charsets.UTF_8.decode(currentMsg.getPayload()).toString(); + LOG.info(decodedMsg); + int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]); + LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", currentOffset, currentIdx, minOffset); + if (currentIdx == startIdx) { + if (offsetQueue.size() == 0) { + offsetQueue.offer(currentOffset); + LOG.info("currentOffset = {} matches startIdx {}", currentOffset, startIdx); + } + return currentOffset; + } + // If minOffset and maxOffset still have their initial values, set the minOffset to currentOffset and return + // the offset of the last received message + if (minOffset == -2 && maxOffset == -1) { + minOffset = currentOffset; + LOG.info("minOffset = {}, return maxOffset = {}", minOffset, maxOffset); + // Returns the offset of the last received messages. Cannot return -1 because -1 will be translated as + // the next offset after the last received message + return consumer.getLastOffset(currentMsg.getTopicPartition(), -1) - 1; + } + if (maxOffset == -1) { + maxOffset = currentOffset; + } + LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset); + // If minOffset > maxOffset, the startIdx cannot be found in the current range of offset. + // Restore the initial values of minOffset and maxOffset and read from the beginning again + if (minOffset > maxOffset) { + minOffset = -2; + maxOffset = -1; + LOG.info("minOffset > maxOffset, return minOffset = {}", minOffset); + return minOffset; + } + if (currentIdx > startIdx) { + maxOffset = currentOffset - 1; + long newOffset = minOffset + (maxOffset - minOffset)/2; + LOG.info("currentIdx > startIdx, return newOffset {}", newOffset); + return newOffset; + } + if (currentIdx < startIdx) { + minOffset = currentOffset + 1; + long newOffset = minOffset + (maxOffset - minOffset)/2; + LOG.info("currentIdx < startIdx, return newOffset {}", newOffset); + return newOffset; + } + } + return Long.MIN_VALUE; + } + + @Override + public void finished() { + //no-op + } + }); + + long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS); + initCancel.cancel(); + + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, startOffset).consume(new KafkaConsumer + .MessageCallback() { --- End diff -- This is an awkward line break. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > ------------------------------------------------------------------------ > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator<FetchedMessage> messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)