[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236512#comment-15236512 ]
Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM: ------------------------------------------------------------ Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming") .map((k, v) -> KeyValue.pair(v.getUserId(), v)) .to(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream<String, RoomOperationMessage> roomOperationMessagesByUserId = builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream<String, UserBroadcastsMessage> userBroadcastsMessagesByUserId = roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, UserBroadcastsMessage::new); {code} In this example roomOperationSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating another KStream off of that output for a join downstream. The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2] Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) {code} was (Author: gfodor): Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming") .map((k, v) -> KeyValue.pair(v.getUserId(), v)) .to(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream<String, RoomOperationMessage> roomOperationMessagesByUserId = builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream<String, UserBroadcastsMessage> userBroadcastsMessagesByUserId = roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, UserBroadcastsMessage::new); {code} In this example roomOperationSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating another KStream off of that output for a join downstream. The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2] Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) {code} > Missing topics on startup > ------------------------- > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Greg Fodor > Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)