sailingYang created SAMZA-1658:
----------------------------------

             Summary: samza-kafka module use kafka deprecated method to get 
topic metadata
                 Key: SAMZA-1658
                 URL: https://issues.apache.org/jira/browse/SAMZA-1658
             Project: Samza
          Issue Type: Improvement
          Components: kafka
    Affects Versions: 0.13.1, 0.14.0, 0.13.0, 0.12.0
            Reporter: sailingYang
             Fix For: 0.15.0


in  ClientUtilTopicMetadataStore, samza-kafka use 
ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated 
method in kafka. this method use SyncProducer and  may get some problem.
{code:java}
// code placeholder
def getTopicInfo(topics: Set[String]) = {
  val currCorrId = corrID.getAndIncrement
  val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, 
brokers, clientId, timeout, currCorrId)

  if (response.correlationId != currCorrId) {
    throw new SamzaException("CorrelationID did not match for request on topics 
%s (sent %d, got %d)" format (topics, currCorrId, response.correlationId))
  }

  response.topicsMetadata
    .map(metadata => (metadata.topic, metadata))
    .toMap
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to