Repository: kafka Updated Branches: refs/heads/trunk 257ad524d -> 2554a8dd4
KAFKA-4839; Throw NoOffsetForPartitionException from poll once for all TopicPartitions affected Signed-off-by: radai-rosenblatt <radai.rosenblattgmail.com> Author: radai-rosenblatt <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Vahid Hashemian <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #2637 from radai-rosenblatt/KAFKA-4839 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2554a8dd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2554a8dd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2554a8dd Branch: refs/heads/trunk Commit: 2554a8dd4dd07b0ac844839b51533bd1e67eed85 Parents: 257ad52 Author: radai-rosenblatt <[email protected]> Authored: Mon May 8 09:42:44 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 8 09:42:44 2017 -0700 ---------------------------------------------------------------------- .../consumer/NoOffsetForPartitionException.java | 25 ++++++++++++++++---- .../clients/consumer/internals/Fetcher.java | 22 ++++++++++------- .../clients/consumer/internals/FetcherTest.java | 15 ++++++++++++ 3 files changed, 50 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java index 14bb710..375cda2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java @@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Set; /** @@ -29,19 +31,34 @@ public class NoOffsetForPartitionException extends InvalidOffsetException { private static final long serialVersionUID = 1L; - private final TopicPartition partition; + private final Set<TopicPartition> partitions; public NoOffsetForPartitionException(TopicPartition partition) { super("Undefined offset with no reset policy for partition: " + partition); - this.partition = partition; + this.partitions = Collections.singleton(partition); } + public NoOffsetForPartitionException(Collection<TopicPartition> partitions) { + super("Undefined offset with no reset policy for partitions: " + partitions); + this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions)); + } + + /** + * returns the first partition (out of {@link #partitions}) for which no offset is defined. + * @deprecated please use {@link #partitions} + * @return a partition with no offset + */ + @Deprecated public TopicPartition partition() { - return partition; + return partitions.isEmpty() ? null : partitions.iterator().next(); } + /** + * returns all partitions for which no offests are defined. + * @return all partitions without offsets + */ public Set<TopicPartition> partitions() { - return Collections.singleton(partition); + return partitions; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index dc6c338..bf5df95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -382,16 +382,17 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { return client.send(node, request); } - private long offsetResetStrategyTimestamp(final TopicPartition partition) { + private void offsetResetStrategyTimestamp( + final TopicPartition partition, + final Map<TopicPartition, Long> output, + final Set<TopicPartition> partitionsWithNoOffsets) { OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); - final long timestamp; if (strategy == OffsetResetStrategy.EARLIEST) - timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; + output.put(partition, ListOffsetRequest.EARLIEST_TIMESTAMP); else if (strategy == OffsetResetStrategy.LATEST) - timestamp = endTimestamp(); + output.put(partition, endTimestamp()); else - throw new NoOffsetForPartitionException(partition); - return timestamp; + partitionsWithNoOffsets.add(partition); } /** @@ -402,14 +403,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { */ private void resetOffsets(final Set<TopicPartition> partitions) { final Map<TopicPartition, Long> offsetResets = new HashMap<>(); + final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>(); for (final TopicPartition partition : partitions) { - offsetResets.put(partition, offsetResetStrategyTimestamp(partition)); + offsetResetStrategyTimestamp(partition, offsetResets, partitionsWithNoOffsets); } final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false); for (final TopicPartition partition : partitions) { final OffsetData offsetData = offsetsByTimes.get(partition); if (offsetData == null) { - throw new NoOffsetForPartitionException(partition); + partitionsWithNoOffsets.add(partition); + continue; } // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) { @@ -417,6 +420,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { this.subscriptions.seek(partition, offsetData.offset); } } + if (!partitionsWithNoOffsets.isEmpty()) { + throw new NoOffsetForPartitionException(partitionsWithNoOffsets); + } } public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 0a0f3d9..4b957a3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; @@ -780,6 +781,20 @@ public class FetcherTest { } @Test + public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() { + Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2)); + subscriptionsNoAutoReset.assignFromUser(tps); + try { + fetcherNoAutoReset.updateFetchPositions(tps); + fail("Should have thrown NoOffsetForPartitionException"); + } catch (NoOffsetForPartitionException e) { + // we expect the exception to be thrown for both TPs at the same time + Set<TopicPartition> partitions = e.partitions(); + assertEquals(tps, partitions); + } + } + + @Test public void testUpdateFetchPositionToCommitted() { // unless a specific reset is expected, the default behavior is to reset to the committed // position if one is present
