Repository: kafka
Updated Branches:
  refs/heads/trunk 3281b3c90 -> 4e1c7d844


KAFKA-4135; Consumer poll with no subscription or assignment should raise an 
error

When the consumer is not subscribed to any topic or, in the case of manual 
assignment, is not assigned any partition, calling `poll()` should raise an 
exception.

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #1839 from vahidhashemian/KAFKA-4135


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e1c7d84
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e1c7d84
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e1c7d84

Branch: refs/heads/trunk
Commit: 4e1c7d844f743e5b439447e645fa41d2f92b8b5f
Parents: 3281b3c
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Mon Sep 19 16:42:43 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Sep 19 16:42:43 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  6 ++++
 .../consumer/internals/SubscriptionState.java   |  4 +++
 .../clients/consumer/KafkaConsumerTest.java     | 32 ++++++++++++++++++++
 3 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e1c7d84/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f7f2d20..108c0cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -954,6 +954,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             topics or to the configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. invalid groupId or
      *             session timeout, errors deserializing key/value pairs, or 
any new error cases in future versions)
+     * @throws java.lang.IllegalArgumentException if the timeout value is 
negative
+     * @throws java.lang.IllegalStateException if the consumer is not 
subscribed to any topics or manually assigned any
+     *             partitions to consume from
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
@@ -962,6 +965,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (timeout < 0)
                 throw new IllegalArgumentException("Timeout must not be 
negative");
 
+            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
+                throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
+
             // poll for new data until the timeout expires
             long start = time.milliseconds();
             long remaining = timeout;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e1c7d84/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 9029417..6dc2060 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -212,6 +212,10 @@ public class SubscriptionState {
         return this.subscriptionType == SubscriptionType.AUTO_PATTERN;
     }
 
+    public boolean hasNoSubscriptionOrUserAssignment() {
+        return this.subscriptionType == SubscriptionType.NONE;
+    }
+
     public void unsubscribe() {
         this.subscription = Collections.emptySet();
         this.userAssignment = Collections.emptySet();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e1c7d84/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index fd0794c..2408c11 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -935,6 +935,38 @@ public class KafkaConsumerTest {
         consumer.close();
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testPollWithNoSubscription() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        try {
+            consumer.poll(0);
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testPollWithEmptySubscription() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        consumer.subscribe(Collections.<String>emptyList());
+        try {
+            consumer.poll(0);
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testPollWithEmptyUserAssignment() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        consumer.assign(Collections.<TopicPartition>emptySet());
+        try {
+            consumer.poll(0);
+        } finally {
+            consumer.close();
+        }
+    }
+
     private ConsumerRebalanceListener getConsumerRebalanceListener(final 
KafkaConsumer<String, String> consumer) {
         return new ConsumerRebalanceListener() {
             @Override

Reply via email to