Repository: samza Updated Branches: refs/heads/master 35a5cd9ad -> d72e47d4d
SAMZA-1026: HDFS System Producer should not have Kafka dependency Author: Prateek Maheshwari <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #144 from prateekm/hdfs-kafka-dependency Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d72e47d4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d72e47d4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d72e47d4 Branch: refs/heads/master Commit: d72e47d4d7a1ea5acc817a6d88c1e5a823f4bfed Parents: 35a5cd9 Author: Prateek Maheshwari <[email protected]> Authored: Wed Apr 26 16:55:25 2017 -0700 Committer: nramesh <[email protected]> Committed: Wed Apr 26 16:55:25 2017 -0700 ---------------------------------------------------------------------- build.gradle | 1 - .../org/apache/samza/system/hdfs/HdfsConfig.scala | 2 +- .../samza/system/hdfs/HdfsSystemFactory.scala | 18 ++++++++++++++---- .../samza/system/hdfs/HdfsSystemProducer.scala | 10 ++++------ 4 files changed, 19 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index b6abd47..dc56077 100644 --- a/build.gradle +++ b/build.gradle @@ -512,7 +512,6 @@ project(":samza-hdfs_$scalaVersion") { dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") - compile project(":samza-kafka_$scalaVersion") // currently hdfs system producer/consumer do depend on yarn for two things: // 1. staging directory 2. security // SAMZA-1032 to solve the staging directory dependency http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala index 12c93ae..52e19bf 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala @@ -83,7 +83,7 @@ object HdfsConfig { val STAGING_DIRECTORY = "systems.%s.stagingDirectory" val STAGING_DIRECTORY_DEFAULT = "" - implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config) + implicit def Config2Hdfs(config: Config) = new HdfsConfig(config) } http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala index 3673431..05d717a 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala @@ -20,11 +20,11 @@ package org.apache.samza.system.hdfs -import org.apache.samza.config.Config +import org.apache.samza.config.{Config, ConfigException, JobConfig} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemFactory import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics -import org.apache.samza.util.{KafkaUtil, Logging} +import org.apache.samza.util.Logging class HdfsSystemFactory extends SystemFactory with Logging { @@ -33,8 +33,11 @@ class HdfsSystemFactory extends SystemFactory with Logging { } def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { - // TODO: SAMZA-1026: should remove Kafka dependency below - val clientId = KafkaUtil.getClientId("samza-producer", config) + val jobConfig = new JobConfig(config) + val jobName = jobConfig.getName.getOrElse(throw new ConfigException("Missing job name.")) + val jobId = jobConfig.getJobId.getOrElse("1") + + val clientId = getClientId("samza-producer", jobName, jobId) val metrics = new HdfsSystemProducerMetrics(systemName, registry) new HdfsSystemProducer(systemName, clientId, config, metrics) } @@ -42,4 +45,11 @@ class HdfsSystemFactory extends SystemFactory with Logging { def getAdmin(systemName: String, config: Config) = { new HdfsSystemAdmin(systemName, config) } + + def getClientId(id: String, jobName: String, jobId: String): String = { + "%s-%s-%s" format + (id.replaceAll("[^A-Za-z0-9]", "_"), + jobName.replaceAll("[^A-Za-z0-9]", "_"), + jobId.replaceAll("[^A-Za-z0-9]", "_")) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/d72e47d4/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala index d2967ca..79bca5b 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala @@ -20,14 +20,12 @@ package org.apache.samza.system.hdfs -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.conf.Configuration -import org.apache.samza.SamzaException -import org.apache.samza.config.Config -import org.apache.samza.system.hdfs.HdfsConfig._ -import org.apache.samza.system.{SystemProducer, OutgoingMessageEnvelope} +import org.apache.hadoop.fs.FileSystem import org.apache.samza.system.hdfs.writer.HdfsWriter -import org.apache.samza.util.{Logging, ExponentialSleepStrategy, TimerUtils, KafkaUtil} +import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer} +import org.apache.samza.util.{Logging, TimerUtils} + import scala.collection.mutable.{Map => MMap}
