cameronlee314 commented on a change in pull request #1157: SAMZA-2325 : Adding
logic to read system config for repl-factor when creating a topic
URL: https://github.com/apache/samza/pull/1157#discussion_r325382574
##########
File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
##########
@@ -243,19 +244,35 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
/**
* Gets the replication factor for the changelog topics. Uses the following
precedence.
*
- * 1. If stores.myStore.changelog.replication.factor is configured, that
value is used.
- * 2. If systems.changelog-system.default.stream.replication.factor is
configured, that value is used.
- * 3. 2
- *
- * Note that the changelog-system has a similar precedence. See
[[StorageConfig]]
+ * 1. If stores.{storeName}.changelog.replication.factor is configured,
that value is used.
+ * 2. If it is not configured, the value configured for
stores.default.changelog.replication.factor is used.
+ * 3. If it is not configured, the RF value configured for the store's
changelog's system, configured using
+ * stores.{storeName}.changelog={systemName}.{streamName}, is used.
+ * 4. If it is not configured, the value for the RF of job.changelog.system
is used.
+ * 5. If it is not configured, the value for the RF of job.default.system
is used.
+ * 6. If it is not configured, the RF is chosen as 2.
*/
- def getChangelogStreamReplicationFactor(name: String) =
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format
name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+ def getChangelogStreamReplicationFactor(storeName: String) = {
+ var changelogRF =
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format storeName)
+
+ if(!changelogRF.isDefined) {
+ changelogRF =
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR)
+ }
- def getDefaultChangelogStreamReplicationFactor() = {
- val changelogSystem = new
StorageConfig(config).getChangelogSystem.orElse(null)
-
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem,
"2"))
+ if(!changelogRF.isDefined) {
+ val changelogSystemStream = new
StorageConfig(config).getChangelogStream(storeName)
+ if (!changelogSystemStream.isPresent) {
+ throw new SamzaException("Changelog system not defined for store
"+storeName)
Review comment:
Minor: Maybe clarify in the exception message that you are trying to
determine the RF, but need the system name to do that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services