[
https://issues.apache.org/jira/browse/SAMZA-1658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bharath Kumarasubramanian resolved SAMZA-1658.
----------------------------------------------
Resolution: Fixed
Fixed as part of recent Kafka 2.0 upgrade SAMZA-2127
> samza-kafka module use kafka deprecated method to get topic metadata
> --------------------------------------------------------------------
>
> Key: SAMZA-1658
> URL: https://issues.apache.org/jira/browse/SAMZA-1658
> Project: Samza
> Issue Type: Improvement
> Components: kafka
> Affects Versions: 0.12.0, 0.13.0, 0.14.0, 0.13.1
> Reporter: sailingYang
> Priority: Major
> Fix For: 0.15.0
>
>
> in ClientUtilTopicMetadataStore, samza-kafka use
> ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated
> method in kafka. this method use SyncProducer and may get some problem.
> {code:java}
> // code placeholder
> def getTopicInfo(topics: Set[String]) = {
> val currCorrId = corrID.getAndIncrement
> val response: TopicMetadataResponse =
> ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
> if (response.correlationId != currCorrId) {
> throw new SamzaException("CorrelationID did not match for request on
> topics %s (sent %d, got %d)" format (topics, currCorrId,
> response.correlationId))
> }
> response.topicsMetadata
> .map(metadata => (metadata.topic, metadata))
> .toMap
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)