cameronlee314 commented on a change in pull request #1157: SAMZA-2325 : Adding
logic to read system config for repl-factor when creating a topic
URL: https://github.com/apache/samza/pull/1157#discussion_r325384679
##########
File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
##########
@@ -243,19 +244,35 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
/**
* Gets the replication factor for the changelog topics. Uses the following
precedence.
*
- * 1. If stores.myStore.changelog.replication.factor is configured, that
value is used.
- * 2. If systems.changelog-system.default.stream.replication.factor is
configured, that value is used.
- * 3. 2
- *
- * Note that the changelog-system has a similar precedence. See
[[StorageConfig]]
+ * 1. If stores.{storeName}.changelog.replication.factor is configured,
that value is used.
+ * 2. If it is not configured, the value configured for
stores.default.changelog.replication.factor is used.
+ * 3. If it is not configured, the RF value configured for the store's
changelog's system, configured using
+ * stores.{storeName}.changelog={systemName}.{streamName}, is used.
+ * 4. If it is not configured, the value for the RF of job.changelog.system
is used.
+ * 5. If it is not configured, the value for the RF of job.default.system
is used.
+ * 6. If it is not configured, the RF is chosen as 2.
*/
- def getChangelogStreamReplicationFactor(name: String) =
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format
name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+ def getChangelogStreamReplicationFactor(storeName: String) = {
+ var changelogRF =
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format storeName)
+
+ if(!changelogRF.isDefined) {
+ changelogRF =
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR)
+ }
- def getDefaultChangelogStreamReplicationFactor() = {
- val changelogSystem = new
StorageConfig(config).getChangelogSystem.orElse(null)
-
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem,
"2"))
+ if(!changelogRF.isDefined) {
+ val changelogSystemStream = new
StorageConfig(config).getChangelogStream(storeName)
+ if (!changelogSystemStream.isPresent) {
+ throw new SamzaException("Changelog system not defined for store
"+storeName)
+ }
+
+ val changelogSystem =
StreamUtil.getSystemStreamFromNames(changelogSystemStream.get()).getSystem
+ changelogRF =
Option.apply(getSystemDefaultReplicationFactor(changelogSystem, "2"))
+ }
+
+ changelogRF.get
Review comment:
Would it be possible to have a flow such that you don't assume `changelogRF`
has a value at the end? For example, maybe just do
`changelogRF.getOrElse(DEFAULT_RF)` at the end?
Just in case someone changes this flow in the future, then `changelogRF`
might accidentally be left empty at this point.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services