[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236512#comment-15236512
 ] 

Greg Fodor commented on KAFKA-3544:
-----------------------------------

Not sure of the best way to share the topology. Here's the relevant part of the 
code:
{code}
                builder
                        .stream(Serdes.Long(), userSpaceBroadcastSerde, 
"positron-db-user_space_broadcasts")
                        .map((id, broadcast) -> 
KeyValue.pair(broadcast.getUserId().toString(), broadcast))
                        .to(Serdes.String(), userSpaceBroadcastSerde, 
"user_space_broadcasts-user_id");

                KTable<String, UserSpaceBroadcasts> userSpaceBroadcastsByUserId 
= builder
                        .stream(Serdes.String(), userSpaceBroadcastSerde, 
"user_space_broadcasts-user_id")
                        .aggregateByKey(...);
{code}

In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm 
basically pivoting the first stream onto a foreign key and then creating a 
KTable off of that output by tapping it and then aggregating. (Given our 
discussions on other tickets there may be a way to simplify this, but I wanted 
to capture it as-is for this report.)

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
                        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
                                               [0/1952]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete 
[StreamThread-2]
Exception in thread "StreamThread-2" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
{code}

> Missing topics on startup
> -------------------------
>
>                 Key: KAFKA-3544
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3544
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Greg Fodor
>            Assignee: Guozhang Wang
>              Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to