[ 
https://issues.apache.org/jira/browse/KAFKA-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akihito Nakano updated KAFKA-6063:
----------------------------------
    Description: 
Hi.
"org.apache.kafka.streams.errors.StreamsException" is thrown in following case.

h3. Create topic

{code:java}
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
1 --partitions 6 --topic word-count-input
{code}

h3. Create Kafka Streams Application

{code:java}
public class WordCountApp {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"wordcount-application");
...
...
{code}



h3.  Ensure that it works fine

{code:java}
$ java -jar wordcount.jar

KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
        StreamsThread appId: wordcount-application
...
...
{code}



h3.  Change "partitions"

{code:java}
$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 --topic 
word-count-input
Adding partitions succeeded!
{code}


h3.  When I start Application, StreamsException is thrown

{code:java}
$ java -jar wordcount.jar

KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
        StreamsThread appId: wordcount-applicationn
...
...

Exception in thread 
"wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: Could not create internal 
topics.
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
{code}


If I change the application id, Application works again.

Thank you.

  was:
Hi.
"org.apache.kafka.streams.errors.StreamsException" is thrown in following case.

h3. Create topic

{code:java}
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
1 --partitions 6 --topic word-count-input
{code}

h3. Create Kafka Streams Application

{code:java}
public class WordCountApp {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"wordcount-application");
...
...
{code}



h3.  Ensure that it works fine

{code:java}
$ java -jar wordcount.jar

KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
        StreamsThread appId: wordcount-application
...
...
{code}



h3.  Change "partitions"

{code:java}
$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 --topic 
word-count-input
Adding partitions succeeded!
{code}


h3.  When I start Application, StreamsException is thrown

{code:java}
$ java -jar wordcount.jar

KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
        StreamsThread appId: wordcount-applicationn
...
...

Exception in thread 
"wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: Could not create internal 
topics.
{code}


If I change the application id, Application works again.

Thank you.


> StreamsException is thrown after the changing `partitions`
> ----------------------------------------------------------
>
>                 Key: KAFKA-6063
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6063
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>         Environment: macOS 10.12
> kafka 0.11.0.1
>            Reporter: Akihito Nakano
>
> Hi.
> "org.apache.kafka.streams.errors.StreamsException" is thrown in following 
> case.
> h3. Create topic
> {code:java}
> $ bin/kafka-topics.sh --create --zookeeper localhost:2181 
> --replication-factor 1 --partitions 6 --topic word-count-input
> {code}
> h3. Create Kafka Streams Application
> {code:java}
> public class WordCountApp {
>     public static void main(String[] args) {
>         Properties config = new Properties();
>         config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "wordcount-application");
> ...
> ...
> {code}
> h3.  Ensure that it works fine
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
>         StreamsThread appId: wordcount-application
> ...
> ...
> {code}
> h3.  Change "partitions"
> {code:java}
> $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 
> --topic word-count-input
> Adding partitions succeeded!
> {code}
> h3.  When I start Application, StreamsException is thrown
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
>         StreamsThread appId: wordcount-applicationn
> ...
> ...
> Exception in thread 
> "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Could not create internal 
> topics.
>       at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
>       at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
>       at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> {code}
> If I change the application id, Application works again.
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to