Repository: kafka Updated Branches: refs/heads/0.11.0 f5aee9bd5 -> 4671a486c
KAFKA-5327: ConsoleConsumer should manually commit offsets for records that are returned in receive() KAFKA-5327: ConsoleConsumer should manually commit offsets for those records it really consumed. Currently it leaves this job to the automatic offset commit scheme where some unread messages will be passed if `--max-messages` is set. Author: amethystic <[email protected]> Author: huxi <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3148 from amethystic/KAFKA-5327_ConsoleConsumer_distable_autocommit (cherry picked from commit d7d1196a0b542adb46d22eeb5b6d12af950b64c9) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4671a486 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4671a486 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4671a486 Branch: refs/heads/0.11.0 Commit: 4671a486cfe2757cd04011f4f969a44e7d37b54a Parents: f5aee9b Author: amethystic <[email protected]> Authored: Fri Jun 2 12:08:26 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Jun 2 12:08:32 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/consumer/BaseConsumer.scala | 46 ++++++++++++++------ .../scala/kafka/tools/ConsoleConsumer.scala | 3 ++ 2 files changed, 36 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4671a486/core/src/main/scala/kafka/consumer/BaseConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index cec74d0..fd5aa41 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -17,18 +17,22 @@ package kafka.consumer +import java.util import java.util.{Collections, Properties} import java.util.regex.Pattern import kafka.api.OffsetRequest import kafka.common.StreamEndException import kafka.message.Message +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeaders +import scala.collection.mutable.HashMap + /** * A base consumer used to abstract both old and new consumer * this class should be removed (along with BaseProducer) @@ -60,8 +64,13 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: import org.apache.kafka.clients.consumer.KafkaConsumer val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) + val offsets = new HashMap[TopicPartition, Long]() + consumerInit() - var recordIter = consumer.poll(0).iterator + private var currentPartition: TopicPartition = null + private var polledRecords = consumer.poll(0) + private var partitionIter = polledRecords.partitions.iterator + private var recordIter: util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null def consumerInit() { (topic, partitionId, offset, whitelist) match { @@ -93,21 +102,30 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: } override def receive(): BaseConsumerRecord = { - if (!recordIter.hasNext) { - recordIter = consumer.poll(timeoutMs).iterator - if (!recordIter.hasNext) - throw new ConsumerTimeoutException + if (recordIter == null || !recordIter.hasNext) { + if (!partitionIter.hasNext) { + polledRecords = consumer.poll(timeoutMs) + partitionIter = polledRecords.partitions.iterator + + if (!partitionIter.hasNext) + throw new ConsumerTimeoutException + } + + currentPartition = partitionIter.next + recordIter = polledRecords.records(currentPartition).iterator } val record = recordIter.next + offsets.put(currentPartition, record.offset + 1) + BaseConsumerRecord(record.topic, - record.partition, - record.offset, - record.timestamp, - record.timestampType, - record.key, - record.value, - record.headers) + record.partition, + record.offset, + record.timestamp, + record.timestampType, + record.key, + record.value, + record.headers) } override def stop() { @@ -119,7 +137,9 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: } override def commit() { - this.consumer.commitSync() + import scala.collection.JavaConverters._ + consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset))}.asJava) + offsets.clear() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4671a486/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 335c724..a1e2ffa 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -77,6 +77,7 @@ object ConsoleConsumer extends Logging { try { process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError) } finally { + consumer.commit() consumer.cleanup() conf.formatter.close() reportRecordCount() @@ -200,7 +201,9 @@ object ConsoleConsumer extends Logging { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel) + props }
