Repository: kafka Updated Branches: refs/heads/trunk bf292a6fa -> aa56dfb9e
KAFKA-2686: Reset needsPartitionAssignment in SubscriptionState.assign() Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Jason Gustafson, Jun Rao Closes #352 from guozhangwang/K2686 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa56dfb9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa56dfb9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa56dfb9 Branch: refs/heads/trunk Commit: aa56dfb9e7cea19faa545a13d42d499a6958cbef Parents: bf292a6 Author: Guozhang Wang <wangg...@gmail.com> Authored: Thu Oct 22 21:06:10 2015 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Oct 22 21:06:10 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 20 ++++- .../kafka/clients/consumer/MockConsumer.java | 4 +- .../consumer/internals/ConsumerCoordinator.java | 2 +- .../consumer/internals/SubscriptionState.java | 38 +++++---- .../clients/consumer/KafkaConsumerTest.java | 32 ++++++++ .../internals/ConsumerCoordinatorTest.java | 18 ++--- .../clients/consumer/internals/FetcherTest.java | 36 ++++----- .../internals/SubscriptionStateTest.java | 81 ++++++++++---------- .../clients/producer/KafkaProducerTest.java | 1 - 9 files changed, 142 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index cd166f0..06a9239 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -629,6 +629,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * assign partitions. Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one). Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(List)}. + * + * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. + * * <p> * As part of group management, the consumer will keep track of the list of consumers that belong to a particular * group and will trigger a rebalance operation if one of the following events trigger - @@ -653,9 +656,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void subscribe(List<String> topics, ConsumerRebalanceListener listener) { acquire(); try { - log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); - this.subscriptions.subscribe(topics, listener); - metadata.setTopics(subscriptions.groupSubscription()); + if (topics.isEmpty()) { + // treat subscribing to empty topic list as the same as unsubscribing + this.unsubscribe(); + } else { + log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); + this.subscriptions.subscribe(topics, listener); + metadata.setTopics(subscriptions.groupSubscription()); + } } finally { release(); } @@ -666,6 +674,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * assign partitions. Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one). It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(List)}. + * + * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. + * * <p> * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer @@ -715,6 +726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void unsubscribe() { acquire(); try { + log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); this.coordinator.resetGeneration(); this.metadata.needMetadataForAllTopics(false); @@ -739,7 +751,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { acquire(); try { log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); - this.subscriptions.assign(partitions); + this.subscriptions.assignFromUser(partitions); Set<String> topics = new HashSet<>(); for (TopicPartition tp : partitions) topics.add(tp.topic()); http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 0242d7b..ed1c1e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -76,7 +76,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { public void rebalance(Collection<TopicPartition> newAssignment) { // TODO: Rebalance callbacks this.records.clear(); - this.subscriptions.changePartitionAssignment(newAssignment); + this.subscriptions.assignFromSubscribed(newAssignment); } @Override @@ -112,7 +112,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @Override public void assign(List<TopicPartition> partitions) { ensureNotClosed(); - this.subscriptions.assign(partitions); + this.subscriptions.assignFromUser(partitions); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index fc7e819..d6291bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -169,7 +169,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl subscriptions.needRefreshCommits(); // update partition assignment - subscriptions.changePartitionAssignment(assignment.partitions()); + subscriptions.assignFromSubscribed(assignment.partitions()); // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 6e79a7f..a9ff35f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -28,8 +28,8 @@ import java.util.regex.Pattern; /** * A class for tracking the topics, partitions, and offsets for the consumer. A partition - * is "assigned" either directly with {@link #assign(List)} (manual assignment) - * or with {@link #changePartitionAssignment(Collection)} (automatic assignment). + * is "assigned" either directly with {@link #assignFromUser(Collection)} (manual assignment) + * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription). * * Once assigned, the partition is not considered "fetchable" until its initial position has * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch @@ -129,12 +129,16 @@ public class SubscriptionState { } public void needReassignment() { - // this.groupSubscription.retainAll(subscription); this.needsPartitionAssignment = true; } - public void assign(List<TopicPartition> partitions) { + /** + * Change the assignment to the specified partitions provided by the user, + * note this is different from {@link #assignFromSubscribed(Collection)} + * whose input partitions are provided from the subscribed topics. + */ + public void assignFromUser(Collection<TopicPartition> partitions) { if (!this.subscription.isEmpty() || this.subscribedPattern != null) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); @@ -146,6 +150,22 @@ public class SubscriptionState { addAssignedPartition(partition); this.assignment.keySet().retainAll(this.userAssignment); + + this.needsPartitionAssignment = false; + } + + /** + * Change the assignment to the specified partitions returned from the coordinator, + * note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs + */ + public void assignFromSubscribed(Collection<TopicPartition> assignments) { + for (TopicPartition tp : assignments) + if (!this.subscription.contains(tp.topic())) + throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic."); + this.assignment.clear(); + for (TopicPartition tp: assignments) + addAssignedPartition(tp); + this.needsPartitionAssignment = false; } public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { @@ -306,16 +326,6 @@ public class SubscriptionState { return this.needsPartitionAssignment; } - public void changePartitionAssignment(Collection<TopicPartition> assignments) { - for (TopicPartition tp : assignments) - if (!this.subscription.contains(tp.topic())) - throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic."); - this.assignment.clear(); - for (TopicPartition tp: assignments) - addAssignedPartition(tp); - this.needsPartitionAssignment = false; - } - public boolean isAssigned(TopicPartition tp) { return assignment.containsKey(tp); } http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 7625218..983c45d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -17,15 +17,20 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.test.MockMetricsReporter; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.Properties; public class KafkaConsumerTest { + private final String topic = "test"; + private final TopicPartition tp0 = new TopicPartition("test", 0); + @Test public void testConstructorClose() throws Exception { Properties props = new Properties(); @@ -46,4 +51,31 @@ public class KafkaConsumerTest { } Assert.fail("should have caught an exception and returned"); } + + @Test + public void testSubscription() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSubscription"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>( + props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + + consumer.subscribe(Collections.singletonList(topic)); + Assert.assertEquals(Collections.singleton(topic), consumer.subscription()); + Assert.assertTrue(consumer.assignment().isEmpty()); + + consumer.subscribe(Collections.<String>emptyList()); + Assert.assertTrue(consumer.subscription().isEmpty()); + Assert.assertTrue(consumer.assignment().isEmpty()); + + consumer.assign(Collections.singletonList(tp0)); + Assert.assertTrue(consumer.subscription().isEmpty()); + Assert.assertEquals(Collections.singleton(tp0), consumer.assignment()); + + consumer.unsubscribe(); + Assert.assertTrue(consumer.subscription().isEmpty()); + Assert.assertTrue(consumer.assignment().isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 93994d7..b20277f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -193,7 +193,7 @@ public class ConsumerCoordinatorTest { // illegal_generation will cause re-partition subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.changePartitionAssignment(Collections.singletonList(tp)); + subscriptions.assignFromSubscribed(Collections.singletonList(tp)); time.sleep(sessionTimeoutMs); RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat @@ -217,7 +217,7 @@ public class ConsumerCoordinatorTest { // illegal_generation will cause re-partition subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.changePartitionAssignment(Collections.singletonList(tp)); + subscriptions.assignFromSubscribed(Collections.singletonList(tp)); time.sleep(sessionTimeoutMs); RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat @@ -413,7 +413,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetOnly() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorKnown(); @@ -430,7 +430,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetMetadata() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorKnown(); @@ -473,7 +473,7 @@ public class ConsumerCoordinatorTest { // now switch to manual assignment subscriptions.unsubscribe(); coordinator.resetGeneration(); - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); // the client should not reuse generation/memberId from auto-subscribed generation client.prepareResponse(new MockClient.RequestMatcher() { @@ -612,7 +612,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorKnown(); - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); coordinator.refreshCommittedOffsetsIfNeeded(); @@ -625,7 +625,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorKnown(); - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L)); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); @@ -639,7 +639,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorKnown(); - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L)); client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); @@ -654,7 +654,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorKnown(); - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); coordinator.refreshCommittedOffsetsIfNeeded(); http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 8773f8c..957d8f9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -110,7 +110,7 @@ public class FetcherTest { @Test public void testFetchNormal() { List<ConsumerRecord<byte[], byte[]>> records; - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); // normal fetch @@ -130,7 +130,7 @@ public class FetcherTest { @Test(expected = RecordTooLargeException.class) public void testFetchRecordTooLarge() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); // prepare large record @@ -150,13 +150,13 @@ public class FetcherTest { @Test public void testFetchDuringRebalance() { subscriptions.subscribe(Arrays.asList(topicName), listener); - subscriptions.changePartitionAssignment(Arrays.asList(tp)); + subscriptions.assignFromSubscribed(Arrays.asList(tp)); subscriptions.seek(tp, 0); fetcher.initFetches(cluster); // Now the rebalance happens and fetch positions are cleared - subscriptions.changePartitionAssignment(Arrays.asList(tp)); + subscriptions.assignFromSubscribed(Arrays.asList(tp)); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); @@ -166,7 +166,7 @@ public class FetcherTest { @Test public void testInFlightFetchOnPausedPartition() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); fetcher.initFetches(cluster); @@ -179,7 +179,7 @@ public class FetcherTest { @Test public void testFetchOnPausedPartition() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); subscriptions.pause(tp); @@ -189,7 +189,7 @@ public class FetcherTest { @Test public void testFetchNotLeaderForPartition() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); fetcher.initFetches(cluster); @@ -201,7 +201,7 @@ public class FetcherTest { @Test public void testFetchUnknownTopicOrPartition() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); fetcher.initFetches(cluster); @@ -213,7 +213,7 @@ public class FetcherTest { @Test public void testFetchOffsetOutOfRange() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); fetcher.initFetches(cluster); @@ -227,7 +227,7 @@ public class FetcherTest { @Test public void testFetchedRecordsAfterSeek() { - subscriptionsNoAutoReset.assign(Arrays.asList(tp)); + subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.initFetches(cluster); @@ -240,7 +240,7 @@ public class FetcherTest { @Test public void testFetchOffsetOutOfRangeException() { - subscriptionsNoAutoReset.assign(Arrays.asList(tp)); + subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.initFetches(cluster); @@ -259,7 +259,7 @@ public class FetcherTest { @Test public void testFetchDisconnected() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); fetcher.initFetches(cluster); @@ -278,7 +278,7 @@ public class FetcherTest { public void testUpdateFetchPositionToCommitted() { // unless a specific reset is expected, the default behavior is to reset to the committed // position if one is present - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.committed(tp, new OffsetAndMetadata(5)); fetcher.updateFetchPositions(Collections.singleton(tp)); @@ -289,7 +289,7 @@ public class FetcherTest { @Test public void testUpdateFetchPositionResetToDefaultOffset() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); // with no commit position, we should reset using the default strategy defined above (EARLIEST) client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), @@ -303,7 +303,7 @@ public class FetcherTest { @Test public void testUpdateFetchPositionResetToLatestOffset() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), @@ -317,7 +317,7 @@ public class FetcherTest { @Test public void testUpdateFetchPositionResetToEarliestOffset() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), @@ -331,7 +331,7 @@ public class FetcherTest { @Test public void testUpdateFetchPositionDisconnect() { - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); // First request gets a disconnect @@ -365,7 +365,7 @@ public class FetcherTest { @Test public void testQuotaMetrics() throws Exception { List<ConsumerRecord<byte[], byte[]>> records; - subscriptions.assign(Arrays.asList(tp)); + subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 0); // normal fetch http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index a0568ad..c5fce61 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -35,19 +35,22 @@ import org.junit.Test; public class SubscriptionStateTest { private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST); + private final String topic = "test"; + private final String topic1 = "test1"; private final TopicPartition tp0 = new TopicPartition("test", 0); private final TopicPartition tp1 = new TopicPartition("test", 1); private final MockRebalanceListener rebalanceListener = new MockRebalanceListener(); @Test public void partitionAssignment() { - state.assign(Arrays.asList(tp0)); + state.assignFromUser(Arrays.asList(tp0)); assertEquals(Collections.singleton(tp0), state.assignedPartitions()); + assertFalse(state.partitionAssignmentNeeded()); state.committed(tp0, new OffsetAndMetadata(1)); state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertAllPositions(tp0, 1L); - state.assign(Arrays.<TopicPartition>asList()); + state.assignFromUser(Arrays.<TopicPartition>asList()); assertTrue(state.assignedPartitions().isEmpty()); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp0)); @@ -55,7 +58,7 @@ public class SubscriptionStateTest { @Test public void partitionReset() { - state.assign(Arrays.asList(tp0)); + state.assignFromUser(Arrays.asList(tp0)); state.seek(tp0, 5); assertEquals(5L, (long) state.fetched(tp0)); assertEquals(5L, (long) state.consumed(tp0)); @@ -73,16 +76,18 @@ public class SubscriptionStateTest { @Test public void topicSubscription() { - state.subscribe(Arrays.asList("test"), rebalanceListener); + state.subscribe(Arrays.asList(topic), rebalanceListener); assertEquals(1, state.subscription().size()); + assertTrue(state.partitionAssignmentNeeded()); assertTrue(state.assignedPartitions().isEmpty()); assertTrue(state.partitionsAutoAssigned()); - state.changePartitionAssignment(asList(tp0)); + state.assignFromSubscribed(asList(tp0)); state.seek(tp0, 1); state.committed(tp0, new OffsetAndMetadata(1)); assertAllPositions(tp0, 1L); - state.changePartitionAssignment(asList(tp1)); + state.assignFromSubscribed(asList(tp1)); assertTrue(state.isAssigned(tp1)); + assertFalse(state.partitionAssignmentNeeded()); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp1)); assertEquals(Collections.singleton(tp1), state.assignedPartitions()); @@ -90,7 +95,7 @@ public class SubscriptionStateTest { @Test public void partitionPause() { - state.assign(Arrays.asList(tp0)); + state.assignFromUser(Arrays.asList(tp0)); state.seek(tp0, 100); assertTrue(state.isFetchable(tp0)); state.pause(tp0); @@ -101,44 +106,24 @@ public class SubscriptionStateTest { @Test public void commitOffsetMetadata() { - state.assign(Arrays.asList(tp0)); + state.assignFromUser(Arrays.asList(tp0)); state.committed(tp0, new OffsetAndMetadata(5, "hi")); assertEquals(5, state.committed(tp0).offset()); assertEquals("hi", state.committed(tp0).metadata()); } - @Test - public void topicUnsubscription() { - final String topic = "test"; - state.subscribe(Arrays.asList(topic), rebalanceListener); - assertEquals(1, state.subscription().size()); - assertTrue(state.assignedPartitions().isEmpty()); - assertTrue(state.partitionsAutoAssigned()); - state.changePartitionAssignment(asList(tp0)); - state.committed(tp0, new OffsetAndMetadata(1)); - state.seek(tp0, 1); - assertAllPositions(tp0, 1L); - state.changePartitionAssignment(asList(tp1)); - assertFalse(state.isAssigned(tp0)); - assertEquals(Collections.singleton(tp1), state.assignedPartitions()); - - state.subscribe(Arrays.<String>asList(), rebalanceListener); - assertEquals(0, state.subscription().size()); - assertTrue(state.assignedPartitions().isEmpty()); - } - @Test(expected = IllegalStateException.class) public void invalidConsumedPositionUpdate() { - state.subscribe(Arrays.asList("test"), rebalanceListener); - state.changePartitionAssignment(asList(tp0)); + state.subscribe(Arrays.asList(topic), rebalanceListener); + state.assignFromSubscribed(asList(tp0)); state.consumed(tp0, 0); } @Test(expected = IllegalStateException.class) public void invalidFetchPositionUpdate() { - state.subscribe(Arrays.asList("test"), rebalanceListener); - state.changePartitionAssignment(asList(tp0)); + state.subscribe(Arrays.asList(topic), rebalanceListener); + state.assignFromSubscribed(asList(tp0)); state.fetched(tp0, 0); } @@ -160,46 +145,60 @@ public class SubscriptionStateTest { @Test(expected = IllegalStateException.class) public void cantSubscribeTopicAndPattern() { - state.subscribe(Arrays.asList("test"), rebalanceListener); + state.subscribe(Arrays.asList(topic), rebalanceListener); state.subscribe(Pattern.compile(".*"), rebalanceListener); } @Test(expected = IllegalStateException.class) public void cantSubscribePartitionAndPattern() { - state.assign(Arrays.asList(new TopicPartition("test", 0))); + state.assignFromUser(Arrays.asList(tp0)); state.subscribe(Pattern.compile(".*"), rebalanceListener); } @Test(expected = IllegalStateException.class) public void cantSubscribePatternAndTopic() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.subscribe(Arrays.asList("test"), rebalanceListener); + state.subscribe(Arrays.asList(topic), rebalanceListener); } @Test(expected = IllegalStateException.class) public void cantSubscribePatternAndPartition() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.assign(Arrays.asList(new TopicPartition("test", 0))); + state.assignFromUser(Arrays.asList(tp0)); } @Test public void patternSubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.changeSubscription(Arrays.asList("test", "test1")); + state.changeSubscription(Arrays.asList(topic, topic1)); assertEquals( "Expected subscribed topics count is incorrect", 2, state.subscription().size()); } @Test - public void patternUnsubscription() { + public void unsubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.changeSubscription(Arrays.asList("test", "test1")); + state.changeSubscription(Arrays.asList(topic, topic1)); + assertTrue(state.partitionAssignmentNeeded()); + + state.assignFromSubscribed(asList(tp1)); + assertEquals(Collections.singleton(tp1), state.assignedPartitions()); + assertFalse(state.partitionAssignmentNeeded()); state.unsubscribe(); + assertEquals(0, state.subscription().size()); + assertTrue(state.assignedPartitions().isEmpty()); + assertTrue(state.partitionAssignmentNeeded()); - assertEquals( - "Expected subscribed topics count is incorrect", 0, state.subscription().size()); + state.assignFromUser(Arrays.asList(tp0)); + assertEquals(Collections.singleton(tp0), state.assignedPartitions()); + assertFalse(state.partitionAssignmentNeeded()); + + state.unsubscribe(); + assertEquals(0, state.subscription().size()); + assertTrue(state.assignedPartitions().isEmpty()); + assertTrue(state.partitionAssignmentNeeded()); } private static class MockRebalanceListener implements ConsumerRebalanceListener { http://git-wip-us.apache.org/repos/asf/kafka/blob/aa56dfb9/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index d1759ce..b044cf4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -31,7 +31,6 @@ import java.util.HashMap; public class KafkaProducerTest { - @Test public void testConstructorFailureCloseResource() { Properties props = new Properties();