kafka-2168; New consumer poll() can block other calls like position(), commit(), and close() indefinitely; patched by Jason Gustafson; reviewed by Jay Kreps, Ewen Cheslack-Postava, Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6d326b0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6d326b0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6d326b0 Branch: refs/heads/trunk Commit: b6d326b0893e60b350608260fd1bd2542337cb5a Parents: 2270a75 Author: Jason Gustafson <a...@confluent.io> Authored: Tue Jun 23 00:07:19 2015 -0400 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Jun 23 00:09:06 2015 -0400 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/Consumer.java | 5 + .../kafka/clients/consumer/ConsumerRecords.java | 7 + .../consumer/ConsumerWakeupException.java | 20 + .../kafka/clients/consumer/KafkaConsumer.java | 715 +++++++++++++++---- .../kafka/clients/consumer/MockConsumer.java | 9 +- .../clients/consumer/OffsetResetStrategy.java | 17 + .../clients/consumer/internals/Coordinator.java | 447 ++++++------ .../clients/consumer/internals/Fetcher.java | 159 ++--- .../clients/consumer/internals/Heartbeat.java | 10 + .../consumer/internals/RequestFuture.java | 209 ++++++ .../consumer/internals/SubscriptionState.java | 41 +- .../org/apache/kafka/common/utils/Utils.java | 15 + .../clients/consumer/MockConsumerTest.java | 2 +- .../consumer/internals/CoordinatorTest.java | 148 +++- .../clients/consumer/internals/FetcherTest.java | 32 +- .../consumer/internals/HeartbeatTest.java | 9 + .../internals/SubscriptionStateTest.java | 19 +- .../apache/kafka/common/utils/UtilsTest.java | 8 + 18 files changed, 1330 insertions(+), 542 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 8f587bc..fd98740 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -108,4 +108,9 @@ public interface Consumer<K, V> extends Closeable { */ public void close(); + /** + * @see KafkaConsumer#wakeup() + */ + public void wakeup(); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 1ca75f8..eb75d2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -27,6 +27,8 @@ import java.util.Map; * {@link Consumer#poll(long)} operation. */ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { + public static final ConsumerRecords<Object, Object> EMPTY = + new ConsumerRecords<Object, Object>(Collections.EMPTY_MAP); private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; @@ -103,4 +105,9 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { } } + @SuppressWarnings("unchecked") + public static <K, V> ConsumerRecords<K, V> empty() { + return (ConsumerRecords<K, V>) EMPTY; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java new file mode 100644 index 0000000..35f1ec9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.KafkaException; + +public class ConsumerWakeupException extends KafkaException { + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 951c34c..9be8fbc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -12,44 +12,48 @@ */ package org.apache.kafka.clients.consumer; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.Coordinator; import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.RequestFuture; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.common.utils.Utils.min; + /** * A Kafka client that consumes records from a Kafka cluster. * <p> @@ -298,10 +302,54 @@ import org.slf4j.LoggerFactory; * * <h3>Multithreaded Processing</h3> * - * The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application - * making the call. We have intentionally avoided implementing a particular threading model for processing. + * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application + * making the call. It is the responsibility of the user to ensure that multi-threaded access + * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}. + * + * <p> + * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to + * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread + * blocking on the operation. This can be used to shutdown the consumer from another thread. The following + * snippet shows the typical pattern: + * + * <pre> + * public class KafkaConsumerRunner implements Runnable { + * private final AtomicBoolean closed = new AtomicBoolean(false); + * private final KafkaConsumer consumer; + * + * public void run() { + * try { + * consumer.subscribe("topic"); + * while (!closed.get()) { + * ConsumerRecords records = consumer.poll(10000); + * // Handle new records + * } + * } catch (ConsumerWakeupException e) { + * // Ignore exception if closing + * if (!closed.get()) throw e; + * } finally { + * consumer.close(); + * } + * } + * + * public void shutdown() { + * closed.set(true); + * consumer.wakeup(); + * } + * } + * </pre> + * + * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer. + * + * <pre> + * closed.set(true); + * consumer.wakeup(); + * </pre> + * * <p> - * This leaves several options for implementing multi-threaded processing of records. + * We have intentionally avoided implementing a particular threading model for processing. This leaves several + * options for implementing multi-threaded processing of records. + * * * <h4>1. One Consumer Per Thread</h4> * @@ -363,6 +411,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final ConsumerRebalanceCallback rebalanceCallback; private long lastCommitAttemptMs; private boolean closed = false; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + + // currentThread holds the threadId of the current thread accessing KafkaConsumer + // and is used to prevent multi-threaded access + private final AtomicReference<Long> currentThread = new AtomicReference<Long>(); + // refcount is used to allow reentrant access by the thread who has acquired currentThread + private int refcount = 0; // reference count for reentrant access + + // TODO: This timeout controls how long we should wait before retrying a request. We should be able + // to leverage the work of KAFKA-2120 to get this value from configuration. + private long requestTimeoutMs = 5000L; /** * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -480,13 +539,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); - this.subscriptions = new SubscriptionState(); + OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); + this.subscriptions = new SubscriptionState(offsetResetStrategy); this.coordinator = new Coordinator(this.client, config.getString(ConsumerConfig.GROUP_ID_CONFIG), - this.retryBackoffMs, config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - this.metadata, this.subscriptions, metrics, metricGrpPrefix, @@ -508,12 +566,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.valueDeserializer = valueDeserializer; } this.fetcher = new Fetcher<K, V>(this.client, - this.retryBackoffMs, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), - config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(), this.keyDeserializer, this.valueDeserializer, this.metadata, @@ -542,8 +598,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment * hasn't happened yet, or the partitions are in the process of getting reassigned). */ - public synchronized Set<TopicPartition> subscriptions() { - return Collections.unmodifiableSet(this.subscriptions.assignedPartitions()); + public Set<TopicPartition> subscriptions() { + acquire(); + try { + return Collections.unmodifiableSet(this.subscriptions.assignedPartitions()); + } finally { + release(); + } } /** @@ -561,12 +622,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param topics A variable list of topics that the consumer wants to subscribe to */ @Override - public synchronized void subscribe(String... topics) { - ensureNotClosed(); - log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); - for (String topic : topics) - this.subscriptions.subscribe(topic); - metadata.addTopics(topics); + public void subscribe(String... topics) { + acquire(); + try { + log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); + for (String topic : topics) + this.subscriptions.subscribe(topic); + metadata.addTopics(topics); + } finally { + release(); + } } /** @@ -574,16 +639,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic * metadata change. * <p> - * + * * @param partitions Partitions to incrementally subscribe to */ @Override - public synchronized void subscribe(TopicPartition... partitions) { - ensureNotClosed(); - log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", ")); - for (TopicPartition tp : partitions) { - this.subscriptions.subscribe(tp); - metadata.addTopics(tp.topic()); + public void subscribe(TopicPartition... partitions) { + acquire(); + try { + log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", ")); + for (TopicPartition tp : partitions) { + this.subscriptions.subscribe(tp); + metadata.addTopics(tp.topic()); + } + } finally { + release(); } } @@ -593,12 +662,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param topics Topics to unsubscribe from */ - public synchronized void unsubscribe(String... topics) { - ensureNotClosed(); - log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", ")); - // throw an exception if the topic was never subscribed to - for (String topic : topics) - this.subscriptions.unsubscribe(topic); + public void unsubscribe(String... topics) { + acquire(); + try { + log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", ")); + // throw an exception if the topic was never subscribed to + for (String topic : topics) + this.subscriptions.unsubscribe(topic); + } finally { + release(); + } } /** @@ -607,12 +680,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param partitions Partitions to unsubscribe from */ - public synchronized void unsubscribe(TopicPartition... partitions) { - ensureNotClosed(); - log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", ")); - // throw an exception if the partition was never subscribed to - for (TopicPartition partition : partitions) - this.subscriptions.unsubscribe(partition); + public void unsubscribe(TopicPartition... partitions) { + acquire(); + try { + log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", ")); + // throw an exception if the partition was never subscribed to + for (TopicPartition partition : partitions) + this.subscriptions.unsubscribe(partition); + } finally { + release(); + } } /** @@ -624,17 +701,65 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions. * - * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits - * indefinitely. Must not be negative + * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns + * immediately with any records available now. Must not be negative. * @return map of topic to records since the last fetch for the subscribed list of topics and partitions * * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic * offset reset policy has been configured. */ @Override - public synchronized ConsumerRecords<K, V> poll(long timeout) { - ensureNotClosed(); - long now = time.milliseconds(); + public ConsumerRecords<K, V> poll(long timeout) { + acquire(); + try { + if (timeout < 0) + throw new IllegalArgumentException("Timeout must not be negative"); + + // Poll for new data until the timeout expires + long remaining = timeout; + while (remaining >= 0) { + long start = time.milliseconds(); + long pollTimeout = min(remaining, timeToNextCommit(start), coordinator.timeToNextHeartbeat(start)); + + Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(pollTimeout, start); + long end = time.milliseconds(); + + if (!records.isEmpty()) { + // If data is available, then return it, but first send off the + // next round of fetches to enable pipelining while the user is + // handling the fetched records. + fetcher.initFetches(metadata.fetch(), end); + pollClient(0, end); + return new ConsumerRecords<K, V>(records); + } + + remaining -= end - start; + + // Nothing was available, so we should backoff before retrying + if (remaining > 0) { + Utils.sleep(min(remaining, retryBackoffMs)); + remaining -= time.milliseconds() - end; + } + } + + return ConsumerRecords.empty(); + } finally { + release(); + } + } + + + /** + * Do one round of polling. In addition to checking for new data, this does any needed + * heart-beating, auto-commits, and offset updates. + * @param timeout The maximum time to block in the underlying poll + * @param now Current time in millis + * @return The fetched records (may be empty) + */ + private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) { + Cluster cluster = this.metadata.fetch(); + + // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) if (subscriptions.partitionsAutoAssigned()) { if (subscriptions.partitionAssignmentNeeded()) { @@ -649,26 +774,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) - updateFetchPositions(this.subscriptions.missingFetchPositions(), now); + updateFetchPositions(this.subscriptions.missingFetchPositions()); // maybe autocommit position if (shouldAutoCommit(now)) commit(CommitType.ASYNC); - /* - * initiate any needed fetches, then block for the timeout the user specified - */ - Cluster cluster = this.metadata.fetch(); + // Init any new fetches (won't resend pending fetches) fetcher.initFetches(cluster, now); - client.poll(timeout, now); - /* - * initiate a fetch request for any nodes that we just got a response from without blocking - */ - fetcher.initFetches(cluster, now); - client.poll(0, now); + pollClient(timeout, now); - return new ConsumerRecords<K, V>(fetcher.fetchedRecords()); + return fetcher.fetchedRecords(); } /** @@ -686,18 +803,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param commitType Control whether the commit is blocking */ @Override - public synchronized void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) { - ensureNotClosed(); - log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets); + public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) { + acquire(); + try { + log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets); - long now = time.milliseconds(); - this.lastCommitAttemptMs = now; + this.lastCommitAttemptMs = time.milliseconds(); - // commit the offsets with the coordinator - boolean syncCommit = commitType.equals(CommitType.SYNC); - if (!syncCommit) - this.subscriptions.needRefreshCommits(); - coordinator.commitOffsets(offsets, syncCommit, now); + // commit the offsets with the coordinator + if (commitType == CommitType.ASYNC) + this.subscriptions.needRefreshCommits(); + commitOffsets(offsets, commitType); + } finally { + release(); + } } /** @@ -710,9 +829,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param commitType Whether or not the commit should block until it is acknowledged. */ @Override - public synchronized void commit(CommitType commitType) { - ensureNotClosed(); - commit(this.subscriptions.allConsumed(), commitType); + public void commit(CommitType commitType) { + acquire(); + try { + commit(this.subscriptions.allConsumed(), commitType); + } finally { + release(); + } } /** @@ -721,35 +844,43 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets */ @Override - public synchronized void seek(TopicPartition partition, long offset) { - ensureNotClosed(); - log.debug("Seeking to offset {} for partition {}", offset, partition); - this.subscriptions.seek(partition, offset); + public void seek(TopicPartition partition, long offset) { + acquire(); + try { + log.debug("Seeking to offset {} for partition {}", offset, partition); + this.subscriptions.seek(partition, offset); + } finally { + release(); + } } /** * Seek to the first offset for each of the given partitions */ - public synchronized void seekToBeginning(TopicPartition... partitions) { - ensureNotClosed(); - Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() - : Arrays.asList(partitions); - for (TopicPartition tp : parts) { - // TODO: list offset call could be optimized by grouping by node - seek(tp, fetcher.offsetBefore(tp, EARLIEST_OFFSET_TIMESTAMP)); + public void seekToBeginning(TopicPartition... partitions) { + acquire(); + try { + Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() + : Arrays.asList(partitions); + for (TopicPartition tp : parts) + subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); + } finally { + release(); } } /** * Seek to the last offset for each of the given partitions */ - public synchronized void seekToEnd(TopicPartition... partitions) { - ensureNotClosed(); - Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() - : Arrays.asList(partitions); - for (TopicPartition tp : parts) { - // TODO: list offset call could be optimized by grouping by node - seek(tp, fetcher.offsetBefore(tp, LATEST_OFFSET_TIMESTAMP)); + public void seekToEnd(TopicPartition... partitions) { + acquire(); + try { + Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() + : Arrays.asList(partitions); + for (TopicPartition tp : parts) + subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); + } finally { + release(); } } @@ -761,16 +892,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is * available. */ - public synchronized long position(TopicPartition partition) { - ensureNotClosed(); - if (!this.subscriptions.assignedPartitions().contains(partition)) - throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); - Long offset = this.subscriptions.consumed(partition); - if (offset == null) { - updateFetchPositions(Collections.singleton(partition), time.milliseconds()); - return this.subscriptions.consumed(partition); - } else { - return offset; + public long position(TopicPartition partition) { + acquire(); + try { + if (!this.subscriptions.assignedPartitions().contains(partition)) + throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); + Long offset = this.subscriptions.consumed(partition); + if (offset == null) { + updateFetchPositions(Collections.singleton(partition)); + return this.subscriptions.consumed(partition); + } else { + return offset; + } + } finally { + release(); } } @@ -787,22 +922,26 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * partition. */ @Override - public synchronized long committed(TopicPartition partition) { - ensureNotClosed(); - Set<TopicPartition> partitionsToFetch; - if (subscriptions.assignedPartitions().contains(partition)) { + public long committed(TopicPartition partition) { + acquire(); + try { + Set<TopicPartition> partitionsToFetch; + if (subscriptions.assignedPartitions().contains(partition)) { + Long committed = this.subscriptions.committed(partition); + if (committed != null) + return committed; + partitionsToFetch = subscriptions.assignedPartitions(); + } else { + partitionsToFetch = Collections.singleton(partition); + } + refreshCommittedOffsets(partitionsToFetch); Long committed = this.subscriptions.committed(partition); - if (committed != null) - return committed; - partitionsToFetch = subscriptions.assignedPartitions(); - } else { - partitionsToFetch = Collections.singleton(partition); + if (committed == null) + throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition); + return committed; + } finally { + release(); } - refreshCommittedOffsets(partitionsToFetch, time.milliseconds()); - Long committed = this.subscriptions.committed(partition); - if (committed == null) - throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition); - return committed; } /** @@ -822,19 +961,41 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { */ @Override public List<PartitionInfo> partitionsFor(String topic) { - Cluster cluster = this.metadata.fetch(); - List<PartitionInfo> parts = cluster.partitionsForTopic(topic); - if (parts == null) { - metadata.add(topic); - awaitMetadataUpdate(); - parts = metadata.fetch().partitionsForTopic(topic); + acquire(); + try { + Cluster cluster = this.metadata.fetch(); + List<PartitionInfo> parts = cluster.partitionsForTopic(topic); + if (parts == null) { + metadata.add(topic); + awaitMetadataUpdate(); + parts = metadata.fetch().partitionsForTopic(topic); + } + return parts; + } finally { + release(); } - return parts; } @Override - public synchronized void close() { - close(false); + public void close() { + if (closed) return; + + acquire(); + try { + close(false); + } finally { + release(); + } + } + + /** + * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. + * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}. + */ + @Override + public void wakeup() { + this.wakeup.set(true); + this.client.wakeup(); } private void close(boolean swallowException) { @@ -856,6 +1017,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs; } + private long timeToNextCommit(long now) { + if (!this.autoCommit) + return Long.MAX_VALUE; + long timeSinceLastCommit = now - this.lastCommitAttemptMs; + if (timeSinceLastCommit > this.autoCommitIntervalMs) + return 0; + return this.autoCommitIntervalMs - timeSinceLastCommit; + } + /** * Request a metadata update and wait until it has occurred */ @@ -863,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { int version = this.metadata.requestUpdate(); do { long now = time.milliseconds(); - this.client.poll(this.retryBackoffMs, now); + this.pollClient(this.retryBackoffMs, now); } while (this.metadata.version() == version); } @@ -881,8 +1051,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } // get new assigned partitions from the coordinator - this.subscriptions.changePartitionAssignment(coordinator.assignPartitions( - new ArrayList<String>(this.subscriptions.subscribedTopics()), now)); + assignPartitions(); // execute the user's callback after rebalance log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions()); @@ -899,25 +1068,73 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * or reset it using the offset reset policy the user has configured. * * @param partitions The partitions that needs updating fetch positions - * @param now The current time * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined */ - private void updateFetchPositions(Set<TopicPartition> partitions, long now) { + private void updateFetchPositions(Set<TopicPartition> partitions) { // first refresh the committed positions in case they are not up-to-date - refreshCommittedOffsets(partitions, now); + refreshCommittedOffsets(partitions); // reset the fetch position to the committed position for (TopicPartition tp : partitions) { - if (subscriptions.fetched(tp) == null) { - if (subscriptions.committed(tp) == null) { - // if the committed position is unknown reset the position - fetcher.resetOffset(tp); - } else { - log.debug("Resetting offset for partition {} to the committed offset {}", - tp, subscriptions.committed(tp)); - subscriptions.seek(tp, subscriptions.committed(tp)); - } + // Skip if we already have a fetch position + if (subscriptions.fetched(tp) != null) + continue; + + // TODO: If there are several offsets to reset, we could submit offset requests in parallel + if (subscriptions.isOffsetResetNeeded(tp)) { + resetOffset(tp); + } else if (subscriptions.committed(tp) == null) { + // There's no committed position, so we need to reset with the default strategy + subscriptions.needOffsetReset(tp); + resetOffset(tp); + } else { + log.debug("Resetting offset for partition {} to the committed offset {}", + tp, subscriptions.committed(tp)); + subscriptions.seek(tp, subscriptions.committed(tp)); + } + } + } + + /** + * Reset offsets for the given partition using the offset reset strategy. + * + * @param partition The given partition that needs reset offset + * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined + */ + private void resetOffset(TopicPartition partition) { + OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); + final long timestamp; + if (strategy == OffsetResetStrategy.EARLIEST) + timestamp = EARLIEST_OFFSET_TIMESTAMP; + else if (strategy == OffsetResetStrategy.LATEST) + timestamp = LATEST_OFFSET_TIMESTAMP; + else + throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined"); + + log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase()); + long offset = listOffset(partition, timestamp); + this.subscriptions.seek(partition, offset); + } + + /** + * Fetch a single offset before the given timestamp for the partition. + * + * @param partition The partition that needs fetching offset. + * @param timestamp The timestamp for fetching offset. + * @return The offset of the message that is published before the given timestamp + */ + private long listOffset(TopicPartition partition, long timestamp) { + while (true) { + RequestFuture<Long> future = fetcher.listOffset(partition, timestamp); + + if (!future.isDone()) + pollFuture(future, requestTimeoutMs); + + if (future.isDone()) { + if (future.succeeded()) + return future.value(); + handleRequestFailure(future); } } } @@ -925,13 +1142,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Refresh the committed offsets for given set of partitions and update the cache */ - private void refreshCommittedOffsets(Set<TopicPartition> partitions, long now) { + private void refreshCommittedOffsets(Set<TopicPartition> partitions) { // we only need to fetch latest committed offset from coordinator if there // is some commit process in progress, otherwise our current // committed cache is up-to-date if (subscriptions.refreshCommitsNeeded()) { // contact coordinator to fetch committed offsets - Map<TopicPartition, Long> offsets = coordinator.fetchOffsets(partitions, now); + Map<TopicPartition, Long> offsets = fetchCommittedOffsets(partitions); // update the position with the offsets for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) { @@ -941,6 +1158,183 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } } + /** + * Block until we have received a partition assignment from the coordinator. + */ + private void assignPartitions() { + // Ensure that there are no pending requests to the coordinator. This is important + // in particular to avoid resending a pending JoinGroup request. + awaitCoordinatorInFlightRequests(); + + while (subscriptions.partitionAssignmentNeeded()) { + RequestFuture<Void> future = coordinator.assignPartitions(time.milliseconds()); + + // Block indefinitely for the join group request (which can take as long as a session timeout) + if (!future.isDone()) + pollFuture(future); + + if (future.failed()) + handleRequestFailure(future); + } + } + + /** + * Block until the coordinator for this group is known. + */ + private void ensureCoordinatorKnown() { + while (coordinator.coordinatorUnknown()) { + RequestFuture<Void> future = coordinator.discoverConsumerCoordinator(); + + if (!future.isDone()) + pollFuture(future, requestTimeoutMs); + + if (future.failed()) + handleRequestFailure(future); + } + } + + /** + * Block until any pending requests to the coordinator have been handled. + */ + public void awaitCoordinatorInFlightRequests() { + while (coordinator.hasInFlightRequests()) { + long now = time.milliseconds(); + pollClient(-1, now); + } + } + + /** + * Lookup the committed offsets for a set of partitions. This will block until the coordinator has + * responded to the offset fetch request. + * @param partitions List of partitions to get offsets for + * @return Map from partition to its respective offset + */ + private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) { + while (true) { + long now = time.milliseconds(); + RequestFuture<Map<TopicPartition, Long>> future = coordinator.fetchOffsets(partitions, now); + + if (!future.isDone()) + pollFuture(future, requestTimeoutMs); + + if (future.isDone()) { + if (future.succeeded()) + return future.value(); + handleRequestFailure(future); + } + } + } + + /** + * Commit offsets. This call blocks (regardless of commitType) until the coordinator + * can receive the commit request. Once the request has been made, however, only the + * synchronous commits will wait for a successful response from the coordinator. + * @param offsets Offsets to commit. + * @param commitType Commit policy + */ + private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) { + if (commitType == CommitType.ASYNC) { + commitOffsetsAsync(offsets); + } else { + commitOffsetsSync(offsets); + } + } + + private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) { + while (true) { + long now = time.milliseconds(); + RequestFuture<Void> future = coordinator.commitOffsets(offsets, now); + + if (!future.isDone() || future.succeeded()) + return; + + handleRequestFailure(future); + } + } + + private void commitOffsetsSync(Map<TopicPartition, Long> offsets) { + while (true) { + long now = time.milliseconds(); + RequestFuture<Void> future = coordinator.commitOffsets(offsets, now); + + if (!future.isDone()) + pollFuture(future, requestTimeoutMs); + + if (future.isDone()) { + if (future.succeeded()) + return; + else + handleRequestFailure(future); + } + } + } + + private void handleRequestFailure(RequestFuture<?> future) { + if (future.hasException()) + throw future.exception(); + + switch (future.retryAction()) { + case BACKOFF: + Utils.sleep(retryBackoffMs); + break; + case POLL: + pollClient(retryBackoffMs, time.milliseconds()); + break; + case FIND_COORDINATOR: + ensureCoordinatorKnown(); + break; + case REFRESH_METADATA: + awaitMetadataUpdate(); + break; + case NOOP: + // Do nothing (retry now) + } + } + + /** + * Poll until a result is ready or timeout expires + * @param future The future to poll for + * @param timeout The time in milliseconds to wait for the result + */ + private void pollFuture(RequestFuture<?> future, long timeout) { + // TODO: Update this code for KAFKA-2120, which adds request timeout to NetworkClient + // In particular, we must ensure that "timed out" requests will not have their callbacks + // invoked at a later time. + long remaining = timeout; + while (!future.isDone() && remaining >= 0) { + long start = time.milliseconds(); + pollClient(remaining, start); + if (future.isDone()) return; + remaining -= time.milliseconds() - start; + } + } + + /** + * Poll indefinitely until the result is ready. + * @param future The future to poll for. + */ + private void pollFuture(RequestFuture<?> future) { + while (!future.isDone()) { + long now = time.milliseconds(); + pollClient(-1, now); + } + } + + /** + * Poll for IO. + * @param timeout The maximum time to wait for IO to become available + * @param now The current time in milliseconds + * @throws ConsumerWakeupException if {@link #wakeup()} is invoked while the poll is active + */ + private void pollClient(long timeout, long now) { + this.client.poll(timeout, now); + + if (wakeup.get()) { + wakeup.set(false); + throw new ConsumerWakeupException(); + } + } + /* * Check that the consumer hasn't been closed. */ @@ -948,4 +1342,27 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (this.closed) throw new IllegalStateException("This consumer has already been closed."); } + + /** + * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking + * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not + * supported). + * @throws IllegalStateException if the consumer has been closed + * @throws ConcurrentModificationException if another thread already has the lock + */ + private void acquire() { + ensureNotClosed(); + Long threadId = Thread.currentThread().getId(); + if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId)) + throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); + refcount++; + } + + /** + * Release the light lock protecting the consumer from multi-threaded access. + */ + private void release() { + if (--refcount == 0) + currentThread.set(null); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index f50da82..46e26a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -40,8 +40,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> { private Map<TopicPartition, List<ConsumerRecord<K, V>>> records; private boolean closed; - public MockConsumer() { - this.subscriptions = new SubscriptionState(); + public MockConsumer(OffsetResetStrategy offsetResetStrategy) { + this.subscriptions = new SubscriptionState(offsetResetStrategy); this.partitions = new HashMap<String, List<PartitionInfo>>(); this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>(); this.closed = false; @@ -175,6 +175,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> { this.closed = true; } + @Override + public void wakeup() { + + } + private void ensureNotClosed() { if (this.closed) throw new IllegalStateException("This consumer has already been closed."); http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java new file mode 100644 index 0000000..542da7f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +public enum OffsetResetStrategy { + LATEST, EARLIEST, NONE +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 41cb945..6c26667 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -15,7 +15,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; @@ -57,7 +56,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; /** - * This class manage the coordination process with the consumer coordinator. + * This class manages the coordination process with the consumer coordinator. */ public final class Coordinator { @@ -67,13 +66,11 @@ public final class Coordinator { private final Time time; private final String groupId; - private final Metadata metadata; private final Heartbeat heartbeat; private final int sessionTimeoutMs; private final String assignmentStrategy; private final SubscriptionState subscriptions; private final CoordinatorMetrics sensors; - private final long retryBackoffMs; private Node consumerCoordinator; private String consumerId; private int generation; @@ -83,10 +80,8 @@ public final class Coordinator { */ public Coordinator(KafkaClient client, String groupId, - long retryBackoffMs, int sessionTimeoutMs, String assignmentStrategy, - Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, @@ -98,10 +93,8 @@ public final class Coordinator { this.generation = -1; this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; this.groupId = groupId; - this.metadata = metadata; this.consumerCoordinator = null; this.subscriptions = subscriptions; - this.retryBackoffMs = retryBackoffMs; this.sessionTimeoutMs = sessionTimeoutMs; this.assignmentStrategy = assignmentStrategy; this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds()); @@ -109,84 +102,110 @@ public final class Coordinator { } /** - * Assign partitions for the subscribed topics. - * - * @param subscribedTopics The subscribed topics list - * @param now The current time - * @return The assigned partition info + * Send a request to get a new partition assignment. This is a non-blocking call which sends + * a JoinGroup request to the coordinator (if it is available). The returned future must + * be polled to see if the request completed successfully. + * @param now The current time in milliseconds + * @return A request future whose completion indicates the result of the JoinGroup request. */ - public List<TopicPartition> assignPartitions(List<String> subscribedTopics, long now) { + public RequestFuture<Void> assignPartitions(final long now) { + final RequestFuture<Void> future = newCoordinatorRequestFuture(now); + if (future.isDone()) return future; // send a join group request to the coordinator + List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscribedTopics()); log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics); - // repeat processing the response until succeed or fatal error - do { - JoinGroupRequest request = new JoinGroupRequest(groupId, + JoinGroupRequest request = new JoinGroupRequest(groupId, this.sessionTimeoutMs, subscribedTopics, this.consumerId, this.assignmentStrategy); - ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now); - JoinGroupResponse response = new JoinGroupResponse(resp.responseBody()); - short errorCode = response.errorCode(); + // create the request for the coordinator + log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id()); + + RequestCompletionHandler completionHandler = new RequestCompletionHandler() { + @Override + public void onComplete(ClientResponse resp) { + handleJoinResponse(resp, future); + } + }; + + sendCoordinator(ApiKeys.JOIN_GROUP, request.toStruct(), completionHandler, now); + return future; + } + + private void handleJoinResponse(ClientResponse response, RequestFuture<Void> future) { + if (response.wasDisconnected()) { + handleCoordinatorDisconnect(response); + future.retryWithNewCoordinator(); + } else { + // process the response + JoinGroupResponse joinResponse = new JoinGroupResponse(response.responseBody()); + short errorCode = joinResponse.errorCode(); if (errorCode == Errors.NONE.code()) { - this.consumerId = response.consumerId(); - this.generation = response.generationId(); + Coordinator.this.consumerId = joinResponse.consumerId(); + Coordinator.this.generation = joinResponse.generationId(); // set the flag to refresh last committed offsets - this.subscriptions.needRefreshCommits(); + subscriptions.needRefreshCommits(); log.debug("Joined group: {}", response); // record re-assignment time - this.sensors.partitionReassignments.record(time.milliseconds() - now); + this.sensors.partitionReassignments.record(response.requestLatencyMs()); - // return assigned partitions - return response.assignedPartitions(); + // update partition assignment + subscriptions.changePartitionAssignment(joinResponse.assignedPartitions()); + future.complete(null); } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) { // reset the consumer id and retry immediately - this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; + Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.", - groupId); + groupId); + + future.retryNow(); } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { // re-discover the coordinator and retry with backoff coordinatorDead(); - Utils.sleep(this.retryBackoffMs); - log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", - groupId); + groupId); + future.retryWithNewCoordinator(); } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code() || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) { // log the error and re-throw the exception + KafkaException e = Errors.forCode(errorCode).exception(); log.error("Attempt to join group {} failed due to: {}", - groupId, Errors.forCode(errorCode).exception().getMessage()); - Errors.forCode(errorCode).maybeThrow(); + groupId, e.getMessage()); + future.raise(e); } else { // unexpected error, throw the exception - throw new KafkaException("Unexpected error in join group response: " - + Errors.forCode(response.errorCode()).exception().getMessage()); + future.raise(new KafkaException("Unexpected error in join group response: " + + Errors.forCode(joinResponse.errorCode()).exception().getMessage())); } - } while (true); + } } /** - * Commit offsets for the specified list of topics and partitions. - * - * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. - * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until - * the commit succeeds. + * Commit offsets for the specified list of topics and partitions. This is a non-blocking call + * which returns a request future that can be polled in the case of a synchronous commit or ignored in the + * asynchronous case. * * @param offsets The list of offsets per partition that should be committed. - * @param blocking Control whether the commit is blocking * @param now The current time + * @return A request future whose value indicates whether the commit was successful or not */ - public void commitOffsets(final Map<TopicPartition, Long> offsets, boolean blocking, long now) { - if (!offsets.isEmpty()) { + public RequestFuture<Void> commitOffsets(final Map<TopicPartition, Long> offsets, long now) { + final RequestFuture<Void> future = newCoordinatorRequestFuture(now); + if (future.isDone()) return future; + + if (offsets.isEmpty()) { + future.complete(null); + } else { // create the offset commit request Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData; offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size()); @@ -198,52 +217,63 @@ public final class Coordinator { OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); - // send request and possibly wait for response if it is blocking - RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets); + RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future); + sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now); + } - if (blocking) { - boolean done; - do { - ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now); + return future; + } - // check for errors - done = true; - OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody()); - for (short errorCode : commitResponse.responseData().values()) { - if (errorCode != Errors.NONE.code()) - done = false; - } - if (!done) { - log.debug("Error in offset commit, backing off for {} ms before retrying again.", - this.retryBackoffMs); - Utils.sleep(this.retryBackoffMs); - } - } while (!done); - } else { - this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now)); - } + private <T> RequestFuture<T> newCoordinatorRequestFuture(long now) { + if (coordinatorUnknown()) + return RequestFuture.newCoordinatorNeeded(); + + if (client.ready(this.consumerCoordinator, now)) + // We have an open connection and we're ready to send + return new RequestFuture<T>(); + + if (this.client.connectionFailed(this.consumerCoordinator)) { + coordinatorDead(); + return RequestFuture.newCoordinatorNeeded(); } + + // The connection has been initiated, so we need to poll to finish it + return RequestFuture.pollNeeded(); } /** - * Fetch the committed offsets of the given set of partitions. + * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The + * returned future can be polled to get the actual offsets returned from the broker. * - * @param partitions The list of partitions which need to ask for committed offsets - * @param now The current time - * @return The fetched offset values + * @param partitions The set of partitions to get offsets for. + * @param now The current time in milliseconds + * @return A request future containing the committed offsets. */ - public Map<TopicPartition, Long> fetchOffsets(Set<TopicPartition> partitions, long now) { - log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", ")); - - while (true) { - // construct the request - OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions)); + public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> partitions, long now) { + final RequestFuture<Map<TopicPartition, Long>> future = newCoordinatorRequestFuture(now); + if (future.isDone()) return future; - // send the request and block on waiting for response - ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now); + log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", ")); + // construct the request + OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions)); + + // send the request with a callback + RequestCompletionHandler completionHandler = new RequestCompletionHandler() { + @Override + public void onComplete(ClientResponse resp) { + handleOffsetResponse(resp, future); + } + }; + sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now); + return future; + } + private void handleOffsetResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) { + if (resp.wasDisconnected()) { + handleCoordinatorDisconnect(resp); + future.retryWithNewCoordinator(); + } else { // parse the response to get the offsets - boolean offsetsReady = true; OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody()); Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size()); for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) { @@ -251,23 +281,21 @@ public final class Coordinator { OffsetFetchResponse.PartitionData data = entry.getValue(); if (data.hasError()) { log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode) - .exception() - .getMessage()); + .exception() + .getMessage()); if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) { // just retry - offsetsReady = false; - Utils.sleep(this.retryBackoffMs); + future.retryAfterBackoff(); } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { // re-discover the coordinator and retry coordinatorDead(); - offsetsReady = false; - Utils.sleep(this.retryBackoffMs); + future.retryWithNewCoordinator(); } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { // just ignore this partition log.debug("Unknown topic or partition for " + tp); } else { - throw new KafkaException("Unexpected error in fetch offset response: " - + Errors.forCode(data.errorCode).exception().getMessage()); + future.raise(new KafkaException("Unexpected error in fetch offset response: " + + Errors.forCode(data.errorCode).exception().getMessage())); } } else if (data.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) @@ -277,8 +305,8 @@ public final class Coordinator { } } - if (offsetsReady) - return offsets; + if (!future.isDone()) + future.complete(offsets); } } @@ -288,124 +316,105 @@ public final class Coordinator { * @param now The current time */ public void maybeHeartbeat(long now) { - if (heartbeat.shouldHeartbeat(now)) { + if (heartbeat.shouldHeartbeat(now) && coordinatorReady(now)) { HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId); - this.client.send(initiateCoordinatorRequest(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now)); + sendCoordinator(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now); this.heartbeat.sentHeartbeat(now); } } - public boolean coordinatorUnknown() { - return this.consumerCoordinator == null; - } - /** - * Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are - * disconnected). Note that this means any requests sent this way must be idempotent. - * - * @return The response + * Get the time until the next heartbeat is needed. + * @param now The current time + * @return The duration in milliseconds before the next heartbeat will be needed. */ - private ClientResponse blockingCoordinatorRequest(ApiKeys api, - Struct request, - RequestCompletionHandler handler, - long now) { - while (true) { - ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, request, handler, now); - ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now); - if (coordinatorResponse.wasDisconnected()) { - handleCoordinatorDisconnect(coordinatorResponse); - Utils.sleep(this.retryBackoffMs); - } else { - return coordinatorResponse; - } - } + public long timeToNextHeartbeat(long now) { + return heartbeat.timeToNextHeartbeat(now); } /** - * Ensure the consumer coordinator is known and we have a ready connection to it. + * Check whether the coordinator has any in-flight requests. + * @return true if the coordinator has pending requests. */ - private void ensureCoordinatorReady() { - while (true) { - if (this.consumerCoordinator == null) - discoverCoordinator(); - - while (true) { - boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds()); - if (ready) { - return; - } else { - log.debug("No connection to coordinator, attempting to connect."); - this.client.poll(this.retryBackoffMs, time.milliseconds()); + public boolean hasInFlightRequests() { + return !coordinatorUnknown() && client.inFlightRequestCount(consumerCoordinator.idString()) > 0; + } - // if the coordinator connection has failed, we need to - // break the inner loop to re-discover the coordinator - if (this.client.connectionFailed(this.consumerCoordinator)) { - log.debug("Coordinator connection failed. Attempting to re-discover."); - coordinatorDead(); - break; - } - } - } - } + public boolean coordinatorUnknown() { + return this.consumerCoordinator == null; } - /** - * Mark the current coordinator as dead. - */ - private void coordinatorDead() { - if (this.consumerCoordinator != null) { - log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id()); - this.consumerCoordinator = null; - } + private boolean coordinatorReady(long now) { + return !coordinatorUnknown() && this.client.ready(this.consumerCoordinator, now); } /** - * Keep discovering the consumer coordinator until it is found. + * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to + * one of the brokers. The returned future should be polled to get the result of the request. + * @return A request future which indicates the completion of the metadata request */ - private void discoverCoordinator() { - while (this.consumerCoordinator == null) { - log.debug("No coordinator known, attempting to discover one."); - Node coordinator = fetchConsumerCoordinator(); - - if (coordinator == null) { - log.debug("No coordinator found, backing off."); - Utils.sleep(this.retryBackoffMs); + public RequestFuture<Void> discoverConsumerCoordinator() { + // initiate the consumer metadata request + // find a node to ask about the coordinator + long now = time.milliseconds(); + Node node = this.client.leastLoadedNode(now); + + if (node == null) { + return RequestFuture.metadataRefreshNeeded(); + } else if (!this.client.ready(node, now)) { + if (this.client.connectionFailed(node)) { + return RequestFuture.metadataRefreshNeeded(); } else { - log.debug("Found coordinator: " + coordinator); - this.consumerCoordinator = coordinator; + return RequestFuture.pollNeeded(); } + } else { + final RequestFuture<Void> future = new RequestFuture<Void>(); + + // create a consumer metadata request + log.debug("Issuing consumer metadata request to broker {}", node.id()); + ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId); + RequestCompletionHandler completionHandler = new RequestCompletionHandler() { + @Override + public void onComplete(ClientResponse resp) { + handleConsumerMetadataResponse(resp, future); + } + }; + send(node, ApiKeys.CONSUMER_METADATA, metadataRequest.toStruct(), completionHandler, now); + return future; } } - /** - * Get the current consumer coordinator information via consumer metadata request. - * - * @return the consumer coordinator node - */ - private Node fetchConsumerCoordinator() { - - // initiate the consumer metadata request - ClientRequest request = initiateConsumerMetadataRequest(); - - // send the request and wait for its response - ClientResponse response = sendAndReceive(request, request.createdTime()); + private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { + log.debug("Consumer metadata response {}", resp); // parse the response to get the coordinator info if it is not disconnected, // otherwise we need to request metadata update - if (!response.wasDisconnected()) { - ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody()); + if (resp.wasDisconnected()) { + future.retryAfterMetadataRefresh(); + } else { + ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody()); // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 - if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) - return new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(), - consumerMetadataResponse.node().host(), - consumerMetadataResponse.node().port()); - } else { - this.metadata.requestUpdate(); + if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) { + this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(), + consumerMetadataResponse.node().host(), + consumerMetadataResponse.node().port()); + future.complete(null); + } else { + future.retryAfterBackoff(); + } } + } - return null; + /** + * Mark the current coordinator as dead. + */ + private void coordinatorDead() { + if (this.consumerCoordinator != null) { + log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id()); + this.consumerCoordinator = null; + } } /** @@ -414,79 +423,23 @@ public final class Coordinator { private void handleCoordinatorDisconnect(ClientResponse response) { int correlation = response.request().request().header().correlationId(); log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected", - response.request(), - correlation, - response.request().request().destination()); + response.request(), + correlation, + response.request().request().destination()); // mark the coordinator as dead coordinatorDead(); } - /** - * Initiate a consumer metadata request to the least loaded node. - * - * @return The created request - */ - private ClientRequest initiateConsumerMetadataRequest() { - // find a node to ask about the coordinator - Node node = this.client.leastLoadedNode(time.milliseconds()); - while (node == null || !this.client.ready(node, time.milliseconds())) { - long now = time.milliseconds(); - this.client.poll(this.retryBackoffMs, now); - node = this.client.leastLoadedNode(now); - - // if there is no ready node, backoff before retry - if (node == null) - Utils.sleep(this.retryBackoffMs); - } - - // create a consumer metadata request - log.debug("Issuing consumer metadata request to broker {}", node.id()); - - ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId); - RequestSend send = new RequestSend(node.idString(), - this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA), - request.toStruct()); - long now = time.milliseconds(); - return new ClientRequest(now, true, send, null); + private void sendCoordinator(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) { + send(this.consumerCoordinator, api, request, handler, now); } - /** - * Initiate a request to the coordinator. - */ - private ClientRequest initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) { - - // first make sure the coordinator is known and ready - ensureCoordinatorReady(); - - // create the request for the coordinator - log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id()); - + private void send(Node node, ApiKeys api, Struct request, RequestCompletionHandler handler, long now) { RequestHeader header = this.client.nextRequestHeader(api); - RequestSend send = new RequestSend(this.consumerCoordinator.idString(), header, request); - return new ClientRequest(now, true, send, handler); - } - - /** - * Attempt to send a request and receive its response. - * - * @return The response - */ - private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) { - - // send the request - this.client.send(clientRequest); - - // drain all responses from the destination node - List<ClientResponse> responses = this.client.completeAll(clientRequest.request().destination(), now); - if (responses.isEmpty()) { - throw new IllegalStateException("This should not happen."); - } else { - // other requests should be handled by the callback, and - // we only care about the response of the last request - return responses.get(responses.size() - 1); - } + RequestSend send = new RequestSend(node.idString(), header, request); + this.client.send(new ClientRequest(now, true, send, handler)); } private class HeartbeatCompletionHandler implements RequestCompletionHandler { @@ -521,18 +474,21 @@ public final class Coordinator { private class CommitOffsetCompletionHandler implements RequestCompletionHandler { private final Map<TopicPartition, Long> offsets; + private final RequestFuture<Void> future; - public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets) { + public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) { this.offsets = offsets; + this.future = future; } @Override public void onComplete(ClientResponse resp) { if (resp.wasDisconnected()) { handleCoordinatorDisconnect(resp); + future.retryWithNewCoordinator(); } else { - OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody()); - for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) { + OffsetCommitResponse commitResponse = new OffsetCommitResponse(resp.responseBody()); + for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); short errorCode = entry.getValue(); long offset = this.offsets.get(tp); @@ -542,14 +498,19 @@ public final class Coordinator { } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); + future.retryWithNewCoordinator(); } else { // do not need to throw the exception but just log the error + future.retryAfterBackoff(); log.error("Error committing partition {} at offset {}: {}", tp, offset, Errors.forCode(errorCode).exception().getMessage()); } } + + if (!future.isDone()) + future.complete(null); } sensors.commitLatency.record(resp.requestLatencyMs()); }