SAMZA-791 - KafkaSystemFactory narrows return types
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f70a11c4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f70a11c4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f70a11c4 Branch: refs/heads/samza-sql Commit: f70a11c4a747b77035dddfba2296ba3b667928e2 Parents: f7f237e Author: Aleksandar Bircakovic <[email protected]> Authored: Mon Oct 19 15:20:55 2015 -0700 Committer: Navina <[email protected]> Committed: Mon Oct 19 15:20:55 2015 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaSystemFactory.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f70a11c4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index d84bf06..a60cda2 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -31,6 +31,9 @@ import org.apache.samza.system.SystemFactory import org.apache.samza.config.StorageConfig._ import org.I0Itec.zkclient.ZkClient import kafka.utils.ZKStringSerializer +import org.apache.samza.system.SystemProducer +import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.SystemConsumer object KafkaSystemFactory extends Logging { def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) { @@ -42,7 +45,7 @@ object KafkaSystemFactory extends Logging { } class KafkaSystemFactory extends SystemFactory with Logging { - def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { + def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = { val clientId = KafkaUtil.getClientId("samza-consumer", config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) @@ -77,7 +80,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { offsetGetter = offsetGetter) } - def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { + def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { val clientId = KafkaUtil.getClientId("samza-producer", config) val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps) @@ -95,7 +98,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { metrics) } - def getAdmin(systemName: String, config: Config) = { + def getAdmin(systemName: String, config: Config): SystemAdmin = { val clientId = KafkaUtil.getClientId("samza-admin", config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val bootstrapServers = producerConfig.bootsrapServers
