kw2542 commented on a change in pull request #1354:
URL: https://github.com/apache/samza/pull/1354#discussion_r418679091
##########
File path:
samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
##########
@@ -110,13 +111,34 @@ object CoordinatorStreamUtil extends Logging {
jobConfig.getJobId)
}
+ /**
+ * Reads and returns launch config persisted in coordinator stream. Only
job.auto sizing configs are currently supported.
+ * @param config full job config
+ * @param metadataStore an instance of the instantiated MetadataStore
+ * @return empty config if auto sizing is disabled, otherwise auto sizing
related configs.
+ */
+ def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore:
MetadataStore): Config = {
+ if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
+ new MapConfig()
+ } else {
+ val config = readConfigFromCoordinatorStream(metadataStore)
+ val launchConfig: util.Map[String, String] = new util.HashMap[String,
String]()
+ for ((key:String, value:String) <- config.asScala) {
+ if (key.startsWith(JobConfig.JOB_AUTOSIZING_CONFIG_PREFIX)) {
Review comment:
I was planning to do it, but it seems that this is in Scala and I cannot
do
config.entrySet().stream().map().collect()
do you know how to do it in scala?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]