Repository: samza Updated Branches: refs/heads/master 40154b4f5 -> 164fa5f03
SAMZA-1563: Make RocksDB store base directory configurable xinyuiscool ^^ Author: Bharath Kumarasubramanian <[email protected]> Reviewers: Prateek M <[email protected]> Closes #491 from bharathkk/samza-1563 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/164fa5f0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/164fa5f0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/164fa5f0 Branch: refs/heads/master Commit: 164fa5f03f26f3b118675469f236e790c0e48e38 Parents: 40154b4 Author: Bharath Kumarasubramanian <[email protected]> Authored: Mon May 7 12:34:26 2018 -0700 Committer: xiliu <[email protected]> Committed: Mon May 7 12:34:26 2018 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 31 +++++++++ .../org/apache/samza/config/JobConfig.scala | 11 ++++ .../apache/samza/container/SamzaContainer.scala | 69 ++++++++++++++------ .../samza/storage/TaskStorageManager.scala | 18 ++--- .../samza/storage/TestTaskStorageManager.scala | 2 +- .../samza/monitor/LocalStoreMonitorConfig.java | 4 +- 6 files changed, 104 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 86ac427..8557480 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -472,8 +472,39 @@ <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt> <dd>Azure based coordination utils.</dd> These coordination utils are currently used for intermediate stream creation. + </dl> </td> </tr> + + <tr> + <td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td> + <td class="default"> + <i>user.dir</i> environment property if set, else current working directory of the process + </td> + <td class="description"> + The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable <i>LOGGED_STORE_BASE_DIR</i>. + <b>Note:</b> The environment variable takes precedence over <i>job.logged.store.base.dir</i>. + + <br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity should ensure that the stores are persisted across application/container restarts. + This means that the location and cleanup of this directory should be separate from the container lifecycle and resource cleanup. + </td> + </tr> + + <tr> + <td class="property" id="job.non-logged.store.base.dir">job.non-logged.store.base.dir</td> + <td class="default"> + <i>user.dir</i> environment property if set, else current working directory of the process + </td> + <td class="description"> + The base directory for non-changelog stores used by Samza application. + + <br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory. + This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config <i>yarn.nodemanager.delete.debug-delay-sec</i>. + + <br>In non-YARN deployment models or when using a different directory other than YARN container directory, stores need to be cleaned up periodically. + </td> + </tr> + <tr> <!-- change link to StandAlone design/tutorial doc. SAMZA-1299 --> <th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th> http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index de83919..75e8005 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -81,6 +81,13 @@ object JobConfig { val PROCESSOR_ID = "processor.id" val PROCESSOR_LIST = "processor.list" + // Represents the store path for non-changelog stores. + val JOB_NON_LOGGED_STORE_BASE_DIR = "job.non-logged.store.base.dir" + + // Represents the store path for stores with changelog enabled. Typically the stores are not cleaned up + // across application restarts + val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir" + implicit def Config2Job(config: Config) = new JobConfig(config) /** @@ -175,4 +182,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { } def getDebounceTimeMs = getInt(JobConfig.JOB_DEBOUNCE_TIME_MS, JobConfig.DEFAULT_DEBOUNCE_TIME_MS) + + def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR) + + def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR) } http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 7aec8e1..ad5cb9a 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -76,6 +76,47 @@ object SamzaContainer extends Logging { classOf[JobModel]) } + // TODO: SAMZA-1701 SamzaContainer should not contain any logic related to store directories + def getNonLoggedStorageBaseDir(config: Config, defaultStoreBaseDir: File) = { + config.getNonLoggedStorePath match { + case Some(nonLoggedStorePath) => + new File(nonLoggedStorePath) + case None => + defaultStoreBaseDir + } + } + + // TODO: SAMZA-1701 SamzaContainer should not contain any logic related to store directories + def getLoggedStorageBaseDir(config: Config, defaultStoreBaseDir: File) = { + val defaultLoggedStorageBaseDir = config.getLoggedStorePath match { + case Some(durableStorePath) => + new File(durableStorePath) + case None => + defaultStoreBaseDir + } + + var loggedStorageBaseDir:File = null + if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { + val jobNameAndId = ( + config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), + config.getJobId.getOrElse("1") + ) + + loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) + } else { + if (config.getLoggedStorePath.isEmpty) { + warn("No override was provided for logged store base directory. This disables local state re-use on " + + "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " + + "variable in all machines running the Samza container or configure job.logged.store.base.dir for your application") + } + + loggedStorageBaseDir = defaultLoggedStorageBaseDir + } + + loggedStorageBaseDir + } + def apply( containerId: String, jobModel: JobModel, @@ -431,10 +472,6 @@ object SamzaContainer extends Logging { .toSet val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry) - // TODO not sure how we should make this config based, or not. Kind of - // strange, since it has some dynamic directories when used with YARN. - val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") - info("Got default storage engine base directory: %s" format defaultStoreBaseDir) val storeWatchPaths = new util.HashSet[Path]() @@ -468,21 +505,13 @@ object SamzaContainer extends Logging { info("Got store consumers: %s" format storeConsumers) - var loggedStorageBaseDir: File = null - if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { - val jobNameAndId = ( - config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), - config.getJobId.getOrElse("1") - ) - loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) - + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) - } else { - warn("No override was provided for logged store base directory. This disables local state re-use on " + - "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " + - "variable in all machines running the Samza container") - loggedStorageBaseDir = defaultStoreBaseDir - } + val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") + info("Got default storage engine base directory: %s" format defaultStoreBaseDir) + + val nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(config, defaultStoreBaseDir) + info("Got base directory for non logged data stores: %s" format nonLoggedStorageBaseDir) + var loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir) info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) val taskStores = storageEngineFactories @@ -509,7 +538,7 @@ object SamzaContainer extends Logging { val storeDir = if (changeLogSystemStreamPartition != null) { TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) } else { - TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName) + TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName) } storeWatchPaths.add(storeDir.toPath) @@ -535,7 +564,7 @@ object SamzaContainer extends Logging { changeLogSystemStreams = changeLogSystemStreams, maxChangeLogStreamPartitions, streamMetadataCache = streamMetadataCache, - storeBaseDir = defaultStoreBaseDir, + nonLoggedStoreBaseDir = nonLoggedStorageBaseDir, loggedStoreBaseDir = loggedStorageBaseDir, partition = taskModel.getChangelogPartition, systemAdmins = systemAdmins, http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 00dc20f..09744cf 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -51,7 +51,7 @@ class TaskStorageManager( changeLogSystemStreams: Map[String, SystemStream] = Map(), changeLogStreamPartitions: Int, streamMetadataCache: StreamMetadataCache, - storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), + nonLoggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"), loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"), partition: Partition, systemAdmins: SystemAdmins, @@ -84,12 +84,12 @@ class TaskStorageManager( debug("Cleaning base directories for stores.") taskStores.keys.foreach(storeName => { - val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString) + val nonLoggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(nonLoggedStoreBaseDir, storeName, taskName) + info("Got non logged storage partition directory as %s" format nonLoggedStorePartitionDir.toPath.toString) - if(storePartitionDir.exists()) { - info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString) - FileUtil.rm(storePartitionDir) + if(nonLoggedStorePartitionDir.exists()) { + info("Deleting non logged storage partition directory %s" format nonLoggedStorePartitionDir.toPath.toString) + FileUtil.rm(nonLoggedStorePartitionDir) } val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) @@ -179,9 +179,9 @@ class TaskStorageManager( info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName)) if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs() } else { - val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName)) - storePartitionDir.mkdirs() + val nonLoggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(nonLoggedStoreBaseDir, storeName, taskName) + info("Using non logged storage partition directory: %s for store: %s." format(nonLoggedStorePartitionDir.toPath.toString, storeName)) + nonLoggedStorePartitionDir.mkdirs() } } } http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index bbdb819..31d3ef6 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -715,7 +715,7 @@ class TaskStorageManagerBuilder extends MockitoSugar { changeLogSystemStreams = changeLogSystemStreams, changeLogStreamPartitions = changeLogStreamPartitions, streamMetadataCache = streamMetadataCache, - storeBaseDir = storeBaseDir, + nonLoggedStoreBaseDir = storeBaseDir, loggedStoreBaseDir = loggedStoreBaseDir, partition = partition, systemAdmins = new SystemAdmins(systemAdmins.asJava), http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java index 8413194..11806f3 100644 --- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java +++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -32,6 +33,7 @@ public class LocalStoreMonitorConfig extends MapConfig { /** * Defines the local store directory of the job. + * @deprecated in favor of {@link org.apache.samza.config.JobConfig#JOB_LOGGED_STORE_BASE_DIR} */ static final String CONFIG_LOCAL_STORE_DIR = "job.local.store.dir"; @@ -68,7 +70,7 @@ public class LocalStoreMonitorConfig extends MapConfig { * @return the location of the job's local directory. */ public String getLocalStoreBaseDir() { - return get(CONFIG_LOCAL_STORE_DIR); + return get(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), get(CONFIG_LOCAL_STORE_DIR)); } /**
