sailingYang created SAMZA-1658: ---------------------------------- Summary: samza-kafka module use kafka deprecated method to get topic metadata Key: SAMZA-1658 URL: https://issues.apache.org/jira/browse/SAMZA-1658 Project: Samza Issue Type: Improvement Components: kafka Affects Versions: 0.13.1, 0.14.0, 0.13.0, 0.12.0 Reporter: sailingYang Fix For: 0.15.0
in ClientUtilTopicMetadataStore, samza-kafka use ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated method in kafka. this method use SyncProducer and may get some problem. {code:java} // code placeholder def getTopicInfo(topics: Set[String]) = { val currCorrId = corrID.getAndIncrement val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId) if (response.correlationId != currCorrId) { throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId)) } response.topicsMetadata .map(metadata => (metadata.topic, metadata)) .toMap } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)