Bharath Kumarasubramanian created SAMZA-1816:
------------------------------------------------
Summary: Extract stream related configurations from store
configurations
Key: SAMZA-1816
URL: https://issues.apache.org/jira/browse/SAMZA-1816
Project: Samza
Issue Type: Bug
Reporter: Bharath Kumarasubramanian
Currently, we have changelog related configurations present under the storage
configuration. E.g. stores.store-name.changelog.replication.factor,
stores.default.changelog.replication.factor.
{code:java}
val storeToChangelog = config.getKafkaChangelogEnabledStores()
// Construct the meta information for each topic, if the replication factor is
not defined, we use 2 as the number of replicas for the change log stream.
val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
{
val replicationFactor =
config.getChangelogStreamReplicationFactor(storeName).toInt
val changelogInfo = ChangelogInfo(replicationFactor,
config.getChangelogKafkaProperties(storeName))
info("Creating topic meta information for topic: %s with replication factor:
%s" format (topicName, replicationFactor))
(topicName, changelogInfo)
}}{code}
Internally, it access the store configurations to build KafkaSystemAdmin which
seems a little weird.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)