cameronlee314 commented on a change in pull request #1165: SAMZA-2326 : Add
interface to support specific configs
URL: https://github.com/apache/samza/pull/1165#discussion_r327727594
##########
File path:
samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
##########
@@ -33,24 +32,21 @@ import org.apache.samza.system.{StreamSpec, SystemAdmin,
SystemFactory, SystemSt
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.JavaConverters._
-import scala.collection.immutable.Map
object CoordinatorStreamUtil extends Logging {
+ val COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory"
+ val DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY =
"org.apache.samza.util.DefaultCoordinatorStreamConfigFactory"
+
/**
* Given a job's full config object, build a subset config which includes
* only the job name, job id, and system config for the coordinator stream.
*/
def buildCoordinatorStreamConfig(config: Config) = {
- val jobConfig = new JobConfig(config)
- val (jobName, jobId) = getJobNameAndId(jobConfig)
- // Build a map with just the system config and job.name/job.id. This is
what's required to start the JobCoordinator.
- val map = config.subset(SystemConfig.SYSTEM_ID_PREFIX format
jobConfig.getCoordinatorSystemName, false).asScala ++
- Map[String, String](
- JobConfig.JOB_NAME -> jobName,
- JobConfig.JOB_ID -> jobId,
- JobConfig.JOB_COORDINATOR_SYSTEM -> jobConfig.getCoordinatorSystemName,
- JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS ->
String.valueOf(jobConfig.getMonitorPartitionChangeFrequency))
- new MapConfig(map.asJava)
+ val buildConfigFactory = config.get(COORDINATOR_STREAM_FACTORY,
DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY)
Review comment:
Can you please move the config lookup (and the config constants) to a helper
method in `JobConfig`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services