Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551779 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override - public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { + String topic = "testClient"; + + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final int startIdx = 15; + final CountDownLatch latch = new CountDownLatch(30 - startIdx); + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + // Creater a consumer + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset + // to fetch message until the matching message is found. + public long onReceived(Iterator<FetchedMessage> messages) { + while (messages.hasNext()) { + FetchedMessage currentMsg = messages.next(); + long currentOffset = currentMsg.getNextOffset() - 1; + String decodedMsg = Charsets.UTF_8.decode(currentMsg.getPayload()).toString(); + LOG.info(decodedMsg); + int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]); + LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", currentOffset, currentIdx, minOffset); + if (currentIdx == startIdx) { + if (offsetQueue.size() == 0) { + offsetQueue.offer(currentOffset); + LOG.info("currentOffset = {} matches startIdx {}", currentOffset, startIdx); + } + return currentOffset; + } + // If minOffset and maxOffset still have their initial values, set the minOffset to currentOffset and return + // the offset of the last received message + if (minOffset == -2 && maxOffset == -1) { + minOffset = currentOffset; + LOG.info("minOffset = {}, return maxOffset = {}", minOffset, maxOffset); + // Returns the offset of the last received messages. Cannot return -1 because -1 will be translated as + // the next offset after the last received message + return consumer.getLastOffset(currentMsg.getTopicPartition(), -1) - 1; + } + if (maxOffset == -1) { + maxOffset = currentOffset; + } + LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset); + // If minOffset > maxOffset, the startIdx cannot be found in the current range of offset. + // Restore the initial values of minOffset and maxOffset and read from the beginning again + if (minOffset > maxOffset) { + minOffset = -2; + maxOffset = -1; + LOG.info("minOffset > maxOffset, return minOffset = {}", minOffset); + return minOffset; + } + if (currentIdx > startIdx) { + maxOffset = currentOffset - 1; + long newOffset = minOffset + (maxOffset - minOffset)/2; + LOG.info("currentIdx > startIdx, return newOffset {}", newOffset); + return newOffset; + } + if (currentIdx < startIdx) { + minOffset = currentOffset + 1; + long newOffset = minOffset + (maxOffset - minOffset)/2; + LOG.info("currentIdx < startIdx, return newOffset {}", newOffset); + return newOffset; + } + } + return Long.MIN_VALUE; + } + + @Override + public void finished() { + //no-op + } + }); + + long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS); + initCancel.cancel(); + + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, startOffset).consume(new KafkaConsumer + .MessageCallback() { --- End diff -- This is an awkward line break.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---