[ 
https://issues.apache.org/jira/browse/SAMZA-1658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bharath Kumarasubramanian resolved SAMZA-1658.
----------------------------------------------
    Resolution: Fixed

Fixed as part of recent Kafka 2.0 upgrade SAMZA-2127

> samza-kafka module use kafka deprecated method to get topic metadata
> --------------------------------------------------------------------
>
>                 Key: SAMZA-1658
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1658
>             Project: Samza
>          Issue Type: Improvement
>          Components: kafka
>    Affects Versions: 0.12.0, 0.13.0, 0.14.0, 0.13.1
>            Reporter: sailingYang
>            Priority: Major
>             Fix For: 0.15.0
>
>
> in  ClientUtilTopicMetadataStore, samza-kafka use 
> ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated 
> method in kafka. this method use SyncProducer and  may get some problem.
> {code:java}
> // code placeholder
> def getTopicInfo(topics: Set[String]) = {
>   val currCorrId = corrID.getAndIncrement
>   val response: TopicMetadataResponse = 
> ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
>   if (response.correlationId != currCorrId) {
>     throw new SamzaException("CorrelationID did not match for request on 
> topics %s (sent %d, got %d)" format (topics, currCorrId, 
> response.correlationId))
>   }
>   response.topicsMetadata
>     .map(metadata => (metadata.topic, metadata))
>     .toMap
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to