Repository: kafka
Updated Branches:
  refs/heads/trunk 2270a7537 -> b6d326b08


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 56281ee..695eaf6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,6 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -61,9 +60,6 @@ import java.util.Map;
 public class Fetcher<K, V> {
 
     private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
-    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
-    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
-
 
     private final KafkaClient client;
 
@@ -72,23 +68,19 @@ public class Fetcher<K, V> {
     private final int maxWaitMs;
     private final int fetchSize;
     private final boolean checkCrcs;
-    private final long retryBackoffMs;
     private final Metadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
     private final List<PartitionRecords<K, V>> records;
-    private final AutoOffsetResetStrategy offsetResetStrategy;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
 
     public Fetcher(KafkaClient client,
-                   long retryBackoffMs,
                    int minBytes,
                    int maxWaitMs,
                    int fetchSize,
                    boolean checkCrcs,
-                   String offsetReset,
                    Deserializer<K> keyDeserializer,
                    Deserializer<V> valueDeserializer,
                    Metadata metadata,
@@ -102,17 +94,16 @@ public class Fetcher<K, V> {
         this.client = client;
         this.metadata = metadata;
         this.subscriptions = subscriptions;
-        this.retryBackoffMs = retryBackoffMs;
         this.minBytes = minBytes;
         this.maxWaitMs = maxWaitMs;
         this.fetchSize = fetchSize;
         this.checkCrcs = checkCrcs;
-        this.offsetResetStrategy = 
AutoOffsetResetStrategy.valueOf(offsetReset);
 
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
 
         this.records = new LinkedList<PartitionRecords<K, V>>();
+
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, 
metricTags);
     }
 
@@ -166,84 +157,76 @@ public class Fetcher<K, V> {
     }
 
     /**
-     * Reset offsets for the given partition using the offset reset strategy.
-     *
-     * @param partition The given partition that needs reset offset
-     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException 
If no offset reset strategy is defined
-     */
-    public void resetOffset(TopicPartition partition) {
-        long timestamp;
-        if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
-            timestamp = EARLIEST_OFFSET_TIMESTAMP;
-        else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
-            timestamp = LATEST_OFFSET_TIMESTAMP;
-        else
-            throw new NoOffsetForPartitionException("No offset is set and no 
reset policy is defined");
-
-        log.debug("Resetting offset for partition {} to {} offset.", 
partition, this.offsetResetStrategy.name()
-            .toLowerCase());
-        this.subscriptions.seek(partition, offsetBefore(partition, timestamp));
-    }
-
-    /**
      * Fetch a single offset before the given timestamp for the partition.
      *
      * @param topicPartition The partition that needs fetching offset.
      * @param timestamp The timestamp for fetching offset.
-     * @return The offset of the message that is published before the given 
timestamp
+     * @return A response which can be polled to obtain the corresponding 
offset.
      */
-    public long offsetBefore(TopicPartition topicPartition, long timestamp) {
-        log.debug("Fetching offsets for partition {}.", topicPartition);
+    public RequestFuture<Long> listOffset(final TopicPartition topicPartition, 
long timestamp) {
         Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new 
HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
         partitions.put(topicPartition, new 
ListOffsetRequest.PartitionData(timestamp, 1));
-        while (true) {
-            long now = time.milliseconds();
-            PartitionInfo info = metadata.fetch().partition(topicPartition);
-            if (info == null) {
-                metadata.add(topicPartition.topic());
-                log.debug("Partition {} is unknown for fetching offset, wait 
for metadata refresh", topicPartition);
-                awaitMetadataUpdate();
-            } else if (info.leader() == null) {
-                log.debug("Leader for partition {} unavailable for fetching 
offset, wait for metadata refresh", topicPartition);
-                awaitMetadataUpdate();
-            } else if (this.client.ready(info.leader(), now)) {
-                Node node = info.leader();
-                ListOffsetRequest request = new ListOffsetRequest(-1, 
partitions);
-                RequestSend send = new RequestSend(node.idString(),
+        long now = time.milliseconds();
+        PartitionInfo info = metadata.fetch().partition(topicPartition);
+        if (info == null) {
+            metadata.add(topicPartition.topic());
+            log.debug("Partition {} is unknown for fetching offset, wait for 
metadata refresh", topicPartition);
+            return RequestFuture.metadataRefreshNeeded();
+        } else if (info.leader() == null) {
+            log.debug("Leader for partition {} unavailable for fetching 
offset, wait for metadata refresh", topicPartition);
+            return RequestFuture.metadataRefreshNeeded();
+        } else if (this.client.ready(info.leader(), now)) {
+            final RequestFuture<Long> future = new RequestFuture<Long>();
+            Node node = info.leader();
+            ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+            RequestSend send = new RequestSend(node.idString(),
                     this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
                     request.toStruct());
-                ClientRequest clientRequest = new ClientRequest(now, true, 
send, null);
-                this.client.send(clientRequest);
-                List<ClientResponse> responses = 
this.client.completeAll(node.idString(), now);
-                if (responses.isEmpty())
-                    throw new IllegalStateException("This should not happen.");
-                ClientResponse response = responses.get(responses.size() - 1);
-                if (response.wasDisconnected()) {
-                    awaitMetadataUpdate();
-                } else {
-                    ListOffsetResponse lor = new 
ListOffsetResponse(response.responseBody());
-                    short errorCode = 
lor.responseData().get(topicPartition).errorCode;
-                    if (errorCode == Errors.NONE.code()) {
-                        List<Long> offsets = 
lor.responseData().get(topicPartition).offsets;
-                        if (offsets.size() != 1)
-                            throw new IllegalStateException("This should not 
happen.");
-                        long offset = offsets.get(0);
-                        log.debug("Fetched offset {} for partition {}", 
offset, topicPartition);
-                        return offset;
-                    } else if (errorCode == 
Errors.NOT_LEADER_FOR_PARTITION.code()
-                            || errorCode == 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                        log.warn("Attempt to fetch offsets for partition {} 
failed due to obsolete leadership information, retrying.",
-                            topicPartition);
-                        awaitMetadataUpdate();
-                    } else {
-                        log.error("Attempt to fetch offsets for partition {} 
failed due to: {}",
-                            topicPartition, 
Errors.forCode(errorCode).exception().getMessage());
-                        awaitMetadataUpdate();
-                    }
+            RequestCompletionHandler completionHandler = new 
RequestCompletionHandler() {
+                @Override
+                public void onComplete(ClientResponse resp) {
+                    handleListOffsetResponse(topicPartition, resp, future);
                 }
+            };
+            ClientRequest clientRequest = new ClientRequest(now, true, send, 
completionHandler);
+            this.client.send(clientRequest);
+            return future;
+        } else {
+            // We initiated a connect to the leader, but we need to poll to 
finish it.
+            return RequestFuture.pollNeeded();
+        }
+    }
+
+    /**
+     * Callback for the response of the list offset call above.
+     * @param topicPartition The partition that was fetched
+     * @param clientResponse The response from the server.
+     */
+    private void handleListOffsetResponse(TopicPartition topicPartition,
+                                          ClientResponse clientResponse,
+                                          RequestFuture<Long> future) {
+        if (clientResponse.wasDisconnected()) {
+            future.retryAfterMetadataRefresh();
+        } else {
+            ListOffsetResponse lor = new 
ListOffsetResponse(clientResponse.responseBody());
+            short errorCode = lor.responseData().get(topicPartition).errorCode;
+            if (errorCode == Errors.NONE.code()) {
+                List<Long> offsets = 
lor.responseData().get(topicPartition).offsets;
+                if (offsets.size() != 1)
+                    throw new IllegalStateException("This should not happen.");
+                long offset = offsets.get(0);
+                log.debug("Fetched offset {} for partition {}", offset, 
topicPartition);
+
+                future.complete(offset);
+            } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+                    || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                log.warn("Attempt to fetch offsets for partition {} failed due 
to obsolete leadership information, retrying.",
+                        topicPartition);
+                future.retryAfterMetadataRefresh();
             } else {
-                log.debug("Leader for partition {} is not ready, retry 
fetching offsets", topicPartition);
-                client.poll(this.retryBackoffMs, now);
+                log.error("Attempt to fetch offsets for partition {} failed 
due to: {}",
+                        topicPartition, 
Errors.forCode(errorCode).exception().getMessage());
+                future.retryAfterMetadataRefresh();
             }
         }
     }
@@ -257,8 +240,10 @@ public class Fetcher<K, V> {
         Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> 
fetchable = new HashMap<Integer, Map<TopicPartition, 
FetchRequest.PartitionData>>();
         for (TopicPartition partition : subscriptions.assignedPartitions()) {
             Node node = cluster.leaderFor(partition);
-            // if there is a leader and no in-flight requests, issue a new 
fetch
-            if (node != null && 
this.client.inFlightRequestCount(node.idString()) == 0) {
+            if (node == null) {
+                metadata.requestUpdate();
+            } else if (this.client.inFlightRequestCount(node.idString()) == 0) 
{
+                // if there is a leader and no in-flight requests, issue a new 
fetch
                 Map<TopicPartition, FetchRequest.PartitionData> fetch = 
fetchable.get(node.id());
                 if (fetch == null) {
                     fetch = new HashMap<TopicPartition, 
FetchRequest.PartitionData>();
@@ -327,7 +312,7 @@ public class Fetcher<K, V> {
                 } else if (partition.errorCode == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
                     // TODO: this could be optimized by grouping all 
out-of-range partitions
                     log.info("Fetch offset {} is out of range, resetting 
offset", subscriptions.fetched(tp));
-                    resetOffset(tp);
+                    subscriptions.needOffsetReset(tp);
                 } else if (partition.errorCode == Errors.UNKNOWN.code()) {
                     log.warn("Unknown error fetching data for topic-partition 
{}", tp);
                 } else {
@@ -356,17 +341,6 @@ public class Fetcher<K, V> {
         return new ConsumerRecord<K, V>(partition.topic(), 
partition.partition(), offset, key, value);
     }
 
-    /*
-     * Request a metadata update and wait until it has occurred
-     */
-    private void awaitMetadataUpdate() {
-        int version = this.metadata.requestUpdate();
-        do {
-            long now = time.milliseconds();
-            this.client.poll(this.retryBackoffMs, now);
-        } while (this.metadata.version() == version);
-    }
-
     private static class PartitionRecords<K, V> {
         public long fetchOffset;
         public TopicPartition partition;
@@ -379,9 +353,6 @@ public class Fetcher<K, V> {
         }
     }
 
-    private static enum AutoOffsetResetStrategy {
-        LATEST, EARLIEST, NONE
-    }
 
     private class FetchManagerMetrics {
         public final Metrics metrics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index e7cfaaa..51eae19 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -42,4 +42,14 @@ public final class Heartbeat {
     public long lastHeartbeatSend() {
         return this.lastHeartbeatSend;
     }
+
+    public long timeToNextHeartbeat(long now) {
+        long timeSinceLastHeartbeat = now - lastHeartbeatSend;
+
+        long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
+        if (timeSinceLastHeartbeat > hbInterval)
+            return 0;
+        else
+            return hbInterval - timeSinceLastHeartbeat;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
new file mode 100644
index 0000000..13fc9af
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Result of an asynchronous request through {@link 
org.apache.kafka.clients.KafkaClient}. To get the
+ * result of the request, you must use poll using {@link 
org.apache.kafka.clients.KafkaClient#poll(long, long)}
+ * until {@link #isDone()} returns true. Typical usage might look like this:
+ *
+ * <pre>
+ *     RequestFuture future = sendRequest();
+ *     while (!future.isDone()) {
+ *         client.poll(timeout, now);
+ *     }
+ *
+ *     switch (future.outcome()) {
+ *     case SUCCESS:
+ *         // handle request success
+ *         break;
+ *     case NEED_RETRY:
+ *         // retry after taking possible retry action
+ *         break;
+ *     case EXCEPTION:
+ *         // handle exception
+  *     }
+ * </pre>
+ *
+ * When {@link #isDone()} returns true, there are three possible outcomes 
(obtained through {@link #outcome()}):
+ *
+ * <ol>
+ * <li> {@link 
org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#SUCCESS}: If 
the request was
+ *    successful, then you can use {@link #value()} to obtain the result.</li>
+ * <li> {@link 
org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#EXCEPTION}: 
If an unhandled exception
+ *    was encountered, you can use {@link #exception()} to get it.</li>
+ * <li> {@link 
org.apache.kafka.clients.consumer.internals.RequestFuture.Outcome#NEED_RETRY}: 
The request may
+ *    not have been successful, but the failure may be ephemeral and the 
caller just needs to try the request again.
+ *    In this case, use {@link #retryAction()} to determine what action should 
be taken (if any) before
+ *    retrying.</li>
+ * </ol>
+ *
+ * @param <T> Return type of the result (Can be Void if there is no response)
+ */
+public class RequestFuture<T> {
+    public static final RequestFuture<Object> NEED_NEW_COORDINATOR = 
newRetryFuture(RetryAction.FIND_COORDINATOR);
+    public static final RequestFuture<Object> NEED_POLL = 
newRetryFuture(RetryAction.POLL);
+    public static final RequestFuture<Object> NEED_METADATA_REFRESH = 
newRetryFuture(RetryAction.REFRESH_METADATA);
+
+    public enum RetryAction {
+        NOOP,             // Retry immediately.
+        POLL,             // Retry after calling poll (e.g. to finish a 
connection)
+        BACKOFF,          // Retry after a delay
+        FIND_COORDINATOR, // Find a new coordinator before retrying
+        REFRESH_METADATA  // Refresh metadata before retrying
+    }
+
+    public enum Outcome {
+        SUCCESS,
+        NEED_RETRY,
+        EXCEPTION
+    }
+
+    private Outcome outcome;
+    private RetryAction retryAction;
+    private T value;
+    private RuntimeException exception;
+
+    /**
+     * Check whether the response is ready to be handled
+     * @return true if the response is ready, false otherwise
+     */
+    public boolean isDone() {
+        return outcome != null;
+    }
+
+    /**
+     * Get the value corresponding to this request (if it has one, as 
indicated by {@link #outcome()}).
+     * @return the value if it exists or null
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Check if the request succeeded;
+     * @return true if a value is available, false otherwise
+     */
+    public boolean succeeded() {
+        return outcome == Outcome.SUCCESS;
+    }
+
+    /**
+     * Check if the request completed failed.
+     * @return true if the request failed (whether or not it can be retried)
+     */
+    public boolean failed() {
+        return outcome != Outcome.SUCCESS;
+    }
+
+    /**
+     * Return the error from this response (assuming {@link #succeeded()} has 
returned false. If the
+     * response is not ready or if there is no retryAction, null is returned.
+     * @return the error if it exists or null
+     */
+    public RetryAction retryAction() {
+        return retryAction;
+    }
+
+    /**
+     * Get the exception from a failed result. You should check that there is 
an exception
+     * with {@link #hasException()} before using this method.
+     * @return The exception if it exists or null
+     */
+    public RuntimeException exception() {
+        return exception;
+    }
+
+    /**
+     * Check whether there was an exception.
+     * @return true if this request failed with an exception
+     */
+    public boolean hasException() {
+        return outcome == Outcome.EXCEPTION;
+    }
+
+    /**
+     * Check the outcome of the future if it is ready.
+     * @return the outcome or null if the future is not finished
+     */
+    public Outcome outcome() {
+        return outcome;
+    }
+
+    /**
+     * The request failed, but should be retried using the provided retry 
action.
+     * @param retryAction The action that should be taken by the caller before 
retrying the request
+     */
+    public void retry(RetryAction retryAction) {
+        this.outcome = Outcome.NEED_RETRY;
+        this.retryAction = retryAction;
+    }
+
+    public void retryNow() {
+        retry(RetryAction.NOOP);
+    }
+
+    public void retryAfterBackoff() {
+        retry(RetryAction.BACKOFF);
+    }
+
+    public void retryWithNewCoordinator() {
+        retry(RetryAction.FIND_COORDINATOR);
+    }
+
+    public void retryAfterMetadataRefresh() {
+        retry(RetryAction.REFRESH_METADATA);
+    }
+
+    /**
+     * Complete the request successfully. After this call, {@link 
#succeeded()} will return true
+     * and the value can be obtained through {@link #value()}.
+     * @param value corresponding value (or null if there is none)
+     */
+    public void complete(T value) {
+        this.outcome = Outcome.SUCCESS;
+        this.value = value;
+    }
+
+    /**
+     * Raise an exception. The request will be marked as failed, and the 
caller can either
+     * handle the exception or throw it.
+     * @param e The exception that
+     */
+    public void raise(RuntimeException e) {
+        this.outcome = Outcome.EXCEPTION;
+        this.exception = e;
+    }
+
+    private static <T> RequestFuture<T> newRetryFuture(RetryAction 
retryAction) {
+        RequestFuture<T> result = new RequestFuture<T>();
+        result.retry(retryAction);
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> RequestFuture<T> pollNeeded() {
+        return (RequestFuture<T>) NEED_POLL;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> RequestFuture<T> metadataRefreshNeeded() {
+        return (RequestFuture<T>) NEED_METADATA_REFRESH;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> RequestFuture<T> newCoordinatorNeeded() {
+        return (RequestFuture<T>) NEED_NEW_COORDINATOR;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index cee7541..6837453 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -12,14 +12,15 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kafka.common.TopicPartition;
-
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer
  */
@@ -49,7 +50,14 @@ public class SubscriptionState {
     /* do we need to request the latest committed offsets from the 
coordinator? */
     private boolean needsFetchCommittedOffsets;
 
-    public SubscriptionState() {
+    /* Partitions that need to be reset before fetching */
+    private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
+
+    /* Default offset reset strategy */
+    private OffsetResetStrategy offsetResetStrategy;
+
+    public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
+        this.offsetResetStrategy = offsetResetStrategy;
         this.subscribedTopics = new HashSet<String>();
         this.subscribedPartitions = new HashSet<TopicPartition>();
         this.assignedPartitions = new HashSet<TopicPartition>();
@@ -58,6 +66,7 @@ public class SubscriptionState {
         this.committed = new HashMap<TopicPartition, Long>();
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the 
consumers to fetch offset upon starting up
+        this.resetPartitions = new HashMap<TopicPartition, 
OffsetResetStrategy>();
     }
 
     public void subscribe(String topic) {
@@ -102,12 +111,14 @@ public class SubscriptionState {
         this.committed.remove(tp);
         this.fetched.remove(tp);
         this.consumed.remove(tp);
+        this.resetPartitions.remove(tp);
     }
 
     public void clearAssignment() {
         this.assignedPartitions.clear();
         this.committed.clear();
         this.fetched.clear();
+        this.consumed.clear();
         this.needsPartitionAssignment = !subscribedTopics().isEmpty();
     }
 
@@ -145,6 +156,7 @@ public class SubscriptionState {
     public void seek(TopicPartition tp, long offset) {
         fetched(tp, offset);
         consumed(tp, offset);
+        resetPartitions.remove(tp);
     }
 
     public Set<TopicPartition> assignedPartitions() {
@@ -169,6 +181,28 @@ public class SubscriptionState {
         return this.consumed;
     }
 
+    public void needOffsetReset(TopicPartition partition, OffsetResetStrategy 
offsetResetStrategy) {
+        this.resetPartitions.put(partition, offsetResetStrategy);
+        this.fetched.remove(partition);
+        this.consumed.remove(partition);
+    }
+
+    public void needOffsetReset(TopicPartition partition) {
+        needOffsetReset(partition, offsetResetStrategy);
+    }
+
+    public boolean isOffsetResetNeeded(TopicPartition partition) {
+        return resetPartitions.containsKey(partition);
+    }
+
+    public boolean isOffsetResetNeeded() {
+        return !resetPartitions.isEmpty();
+    }
+
+    public OffsetResetStrategy resetStrategy(TopicPartition partition) {
+        return resetPartitions.get(partition);
+    }
+
     public boolean hasAllFetchPositions() {
         return this.fetched.size() >= this.assignedPartitions.size();
     }
@@ -192,4 +226,5 @@ public class SubscriptionState {
         this.needsPartitionAssignment = false;
     }
 
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index f73eedb..af9993c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -182,6 +182,21 @@ public class Utils {
     }
 
     /**
+     * Get the minimum of some long values.
+     * @param first Used to ensure at least one value
+     * @param rest The rest of longs to compare
+     * @return The minimum of all passed argument.
+     */
+    public static long min(long first, long ... rest) {
+        long min = first;
+        for (int i = 0; i < rest.length; i++) {
+            if (rest[i] < min)
+                min = rest[i];
+        }
+        return min;
+    }
+
+    /**
      * Get the length for UTF8-encoding a string without encoding it first
      * 
      * @param s The string to calculate the length for

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 677edd3..26b6b40 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 
 public class MockConsumerTest {
     
-    private MockConsumer<String, String> consumer = new MockConsumer<String, 
String>();
+    private MockConsumer<String, String> consumer = new MockConsumer<String, 
String>(OffsetResetStrategy.EARLIEST);
 
     @Test
     public void testSimpleMock() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 1454ab7..613b192 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -17,10 +17,11 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -49,24 +50,20 @@ public class CoordinatorTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
-    private long retryBackoffMs = 0L;
     private int sessionTimeoutMs = 10;
     private String rebalanceStrategy = "not-matter";
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions = new SubscriptionState();
+    private SubscriptionState subscriptions = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, 
String>();
 
     private Coordinator coordinator = new Coordinator(client,
         groupId,
-        retryBackoffMs,
         sessionTimeoutMs,
         rebalanceStrategy,
-        metadata,
         subscriptions,
         metrics,
         "consumer" + groupId,
@@ -75,13 +72,14 @@ public class CoordinatorTest {
 
     @Before
     public void setup() {
-        metadata.update(cluster, time.milliseconds());
         client.setNode(node);
     }
 
     @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // normal heartbeat
         time.sleep(sessionTimeoutMs);
@@ -94,6 +92,8 @@ public class CoordinatorTest {
     @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // consumer_coordinator_not_available will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -108,6 +108,8 @@ public class CoordinatorTest {
     @Test
     public void testNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // not_coordinator will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -122,6 +124,8 @@ public class CoordinatorTest {
     @Test
     public void testIllegalGeneration() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(topicName);
@@ -139,6 +143,8 @@ public class CoordinatorTest {
     @Test
     public void testCoordinatorDisconnect() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // coordinator disconnect will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -152,39 +158,67 @@ public class CoordinatorTest {
 
     @Test
     public void testNormalJoinGroup() {
+        subscriptions.subscribe(topicName);
+        subscriptions.needReassignment();
+
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // normal join group
         client.prepareResponse(joinGroupResponse(1, "consumer", 
Collections.singletonList(tp), Errors.NONE.code()));
-        assertEquals(Collections.singletonList(tp),
-            coordinator.assignPartitions(Collections.singletonList(topicName), 
time.milliseconds()));
-        assertEquals(0, client.inFlightRequestCount());
+        coordinator.assignPartitions(time.milliseconds());
+        client.poll(0, time.milliseconds());
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
     }
 
     @Test
     public void testReJoinGroup() {
+        subscriptions.subscribe(topicName);
+        subscriptions.needReassignment();
+
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
 
         // diconnected from original coordinator will cause re-discover and 
join again
         client.prepareResponse(joinGroupResponse(1, "consumer", 
Collections.singletonList(tp), Errors.NONE.code()), true);
+        coordinator.assignPartitions(time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+
+        // rediscover the coordinator
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+
+        // try assigning partitions again
         client.prepareResponse(joinGroupResponse(1, "consumer", 
Collections.singletonList(tp), Errors.NONE.code()));
-        assertEquals(Collections.singletonList(tp),
-            coordinator.assignPartitions(Collections.singletonList(topicName), 
time.milliseconds()));
-        assertEquals(0, client.inFlightRequestCount());
+        coordinator.assignPartitions(time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
     }
 
 
     @Test
     public void testCommitOffsetNormal() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
-        // sync commit
+        // With success flag
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, 
time.milliseconds());
+        RequestFuture<Void> result = 
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertTrue(result.succeeded());
 
-        // async commit
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, 
time.milliseconds());
+        // Without success flag
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
         client.respond(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         assertEquals(1, client.poll(0, time.milliseconds()).size());
     }
@@ -192,34 +226,55 @@ public class CoordinatorTest {
     @Test
     public void testCommitOffsetError() {
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // async commit with coordinator not available
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, 
time.milliseconds());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
         assertEquals(1, client.poll(0, time.milliseconds()).size());
         assertTrue(coordinator.coordinatorUnknown());
         // resume
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // async commit with not coordinator
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, 
time.milliseconds());
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
         assertEquals(1, client.poll(0, time.milliseconds()).size());
         assertTrue(coordinator.coordinatorUnknown());
         // resume
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // sync commit with not_coordinator
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, 
time.milliseconds());
+        RequestFuture<Void> result = 
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, 
result.retryAction());
 
         // sync commit with coordinator disconnected
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())), true);
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, 
time.milliseconds());
+        result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
+
+        assertEquals(0, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, 
result.retryAction());
+
+        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+
+        result = coordinator.commitOffsets(Collections.singletonMap(tp, 100L), 
time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(result.isDone());
+        assertTrue(result.succeeded());
     }
 
 
@@ -227,33 +282,70 @@ public class CoordinatorTest {
     public void testFetchOffset() {
 
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
 
         // normal fetch
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 
100L));
-        assertEquals(100L, (long) 
coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds()).get(tp));
+        RequestFuture<Map<TopicPartition, Long>> result = 
coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertEquals(100L, (long) result.value().get(tp));
 
         // fetch with loading in progress
         client.prepareResponse(offsetFetchResponse(tp, 
Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 
100L));
-        assertEquals(100L, (long) 
coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds()).get(tp));
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.failed());
+        assertEquals(RequestFuture.RetryAction.BACKOFF, result.retryAction());
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertEquals(100L, (long) result.value().get(tp));
 
         // fetch with not coordinator
         client.prepareResponse(offsetFetchResponse(tp, 
Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
         client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 
100L));
-        assertEquals(100L, (long) 
coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds()).get(tp));
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.failed());
+        assertEquals(RequestFuture.RetryAction.FIND_COORDINATOR, 
result.retryAction());
+
+        coordinator.discoverConsumerCoordinator();
+        client.poll(0, time.milliseconds());
+
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertEquals(100L, (long) result.value().get(tp));
 
         // fetch with no fetchable offsets
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 
-1L));
-        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds()).size());
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.value().isEmpty());
 
         // fetch with offset topic unknown
         client.prepareResponse(offsetFetchResponse(tp, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
-        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds()).size());
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.value().isEmpty());
 
         // fetch with offset -1
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 
-1L));
-        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds()).size());
+        result = coordinator.fetchOffsets(Collections.singleton(tp), 
time.milliseconds());
+        client.poll(0, time.milliseconds());
+        assertTrue(result.isDone());
+        assertTrue(result.value().isEmpty());
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4195410..405efdc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -30,10 +29,11 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -41,37 +41,33 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class FetcherTest {
 
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
-    private long retryBackoffMs = 0L;
     private int minBytes = 1;
     private int maxWaitMs = 0;
     private int fetchSize = 1000;
-    private String offsetReset = "EARLIEST";
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private SubscriptionState subscriptions = new SubscriptionState();
+    private SubscriptionState subscriptions = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, 
String>();
 
     private MemoryRecords records = 
MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
 
     private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], 
byte[]>(client,
-        retryBackoffMs,
         minBytes,
         maxWaitMs,
         fetchSize,
         true, // check crc
-        offsetReset,
         new ByteArrayDeserializer(),
         new ByteArrayDeserializer(),
         metadata,
@@ -140,11 +136,11 @@ public class FetcherTest {
         subscriptions.fetched(tp, 5);
         fetcher.initFetches(cluster, time.milliseconds());
         client.respond(fetchResponse(this.records.buffer(), 
Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
-        
client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), 
Errors.NONE.code()));
         client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertEquals(0L, (long) subscriptions.fetched(tp));
-        assertEquals(0L, (long) subscriptions.consumed(tp));
+        assertEquals(null, subscriptions.fetched(tp));
+        assertEquals(null, subscriptions.consumed(tp));
     }
 
     @Test
@@ -157,11 +153,11 @@ public class FetcherTest {
         // fetch with out of range
         fetcher.initFetches(cluster, time.milliseconds());
         client.respond(fetchResponse(this.records.buffer(), 
Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
-        
client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), 
Errors.NONE.code()));
         client.poll(0, time.milliseconds());
+        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertEquals(0L, (long) subscriptions.fetched(tp));
-        assertEquals(0L, (long) subscriptions.consumed(tp));
+        assertEquals(null, subscriptions.fetched(tp));
+        assertEquals(null, subscriptions.consumed(tp));
     }
 
     private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
@@ -169,9 +165,5 @@ public class FetcherTest {
         return response.toStruct();
     }
 
-    private Struct listOffsetResponse(List<Long> offsets, short error) {
-        ListOffsetResponse response = new 
ListOffsetResponse(Collections.singletonMap(tp, new 
ListOffsetResponse.PartitionData(error, offsets)));
-        return response.toStruct();
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index ecc78ce..ee1ede0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.MockTime;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -42,4 +43,12 @@ public class HeartbeatTest {
         time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
         assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
     }
+
+    @Test
+    public void testTimeToNextHeartbeat() {
+        heartbeat.sentHeartbeat(0);
+        assertEquals(100, heartbeat.timeToNextHeartbeat(0));
+        assertEquals(0, heartbeat.timeToNextHeartbeat(100));
+        assertEquals(0, heartbeat.timeToNextHeartbeat(200));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index e000cf8..319751c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -22,12 +22,13 @@ import static java.util.Arrays.asList;
 
 import java.util.Collections;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
 public class SubscriptionStateTest {
     
-    private final SubscriptionState state = new SubscriptionState();
+    private final SubscriptionState state = new 
SubscriptionState(OffsetResetStrategy.EARLIEST);
     private final TopicPartition tp0 = new TopicPartition("test", 0);
     private final TopicPartition tp1 = new TopicPartition("test", 1);
 
@@ -43,7 +44,21 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertAllPositions(tp0, null);
     }
-    
+
+    @Test
+    public void partitionReset() {
+        state.subscribe(tp0);
+        state.seek(tp0, 5);
+        assertEquals(5L, (long) state.fetched(tp0));
+        assertEquals(5L, (long) state.consumed(tp0));
+        state.needOffsetReset(tp0);
+        assertTrue(state.isOffsetResetNeeded());
+        assertTrue(state.isOffsetResetNeeded(tp0));
+        assertEquals(null, state.fetched(tp0));
+        assertEquals(null, state.consumed(tp0));
+    }
+
+    @Test
     public void topicSubscription() {
         state.subscribe("test");
         assertEquals(1, state.subscribedTopics().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6d326b0/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2ebe3c2..e7951d8 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -100,4 +100,12 @@ public class UtilsTest {
         buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer();
         this.subTest(buffer);
     }
+
+    @Test
+    public void testMin() {
+        assertEquals(1, Utils.min(1));
+        assertEquals(1, Utils.min(1, 2, 3));
+        assertEquals(1, Utils.min(2, 1, 3));
+        assertEquals(1, Utils.min(2, 3, 1));
+    }
 }
\ No newline at end of file

Reply via email to