This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ea235  SAMZA-2274: Cleanup store directories on startup for 
non-logged store only
e0ea235 is described below

commit e0ea235155ac5b619f0c5c67d6e86dc45363bda5
Author: mynameborat <[email protected]>
AuthorDate: Fri Jul 19 15:13:56 2019 -0700

    SAMZA-2274: Cleanup store directories on startup for non-logged store only
    
    During startup, the CSM cleans up the base directories for stores. Ideally, 
this cleanup should only be done for non-logged stores. In our current setup, 
if the applications configure both logged and non-logged store base directories 
to be the same, we delete the logged store directories as well which results in 
unnecessary bootstrap.
    
    In this PR,
     * We modify the clean up behavior on startup to impact only non-logged 
stores
     * Update the configuration documentation to reflect potential implications 
of not configuring `job.logged.store.base.dir` and 
`job.non.logged.store.base.dir` for stateful applications.
     * Add a warning if both logged and non-logged store base directories are 
configured to use same path.
    
    Author: mynameborat <[email protected]>
    
    Reviewers: Hai Lu <[email protected]>
    
    Closes #1107 from mynameborat/SAMZA-2274 and squashes the following commits:
    
    7d47a90b [mynameborat] Fix the logged store condition check
    35b1bb3d [mynameborat] Cleanup store directories on startup for non-logged 
store only
---
 .../versioned/jobs/samza-configurations.md         |  6 +--
 .../samza/storage/ContainerStorageManager.java     | 54 +++++++++++++---------
 2 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 3f12594..d24efc2 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -245,7 +245,7 @@ Configs for producing to 
[ElasticSearch](https://www.elastic.co/products/elastic
 |systems.**_system-name_**.<br>bulk.flush.interval.ms|never|How often buffered 
messages should be flushed.|
 
 ### <a name="state-storage"></a>[4. State Storage](#state-storage)
-These properties define Samza's storage mechanism for efficient [stateful 
stream processing](../container/state-management.html).
+These properties define Samza's storage mechanism for efficient [stateful 
stream processing](../container/state-management.html). Stateful applications 
should configure base directories for durable and non-durable stores using 
`job.logged.store.base.dir` and `job.non.logged.store.base.dir` respectively.
 
 |Name|Default|Description|
 |--- |--- |--- |
@@ -254,12 +254,12 @@ These properties define Samza's storage mechanism for 
efficient [stateful stream
 |stores.**_store-name_**.msg.serde| |If the storage engine expects values in 
the store to be simple byte arrays, this 
[serde](../container/serialization.html) allows the stream task to access the 
store using another object type as value. The value of this property must be a 
serde-name that is registered with serializers.registry.*.class. If this 
property is not set, values are passed unmodified to the storage engine (and 
the changelog stream, if appropriate).|
 |stores.**_store-name_**.changelog| |Samza stores are local to a container. If 
the container fails, the contents of the store are lost. To prevent loss of 
data, you need to set this property to configure a changelog stream: Samza then 
ensures that writes to the store are replicated to this stream, and the store 
is restored from this stream after a failure. The value of this property is 
given in the form system-name.stream-name. The "system-name" part is optional. 
If it is omitted you mus [...]
 |stores.**_store-name_**.rocksdb.ttl.ms| |__For RocksDB:__ The time-to-live of 
the store. Please note it's not a strict TTL limit (removed only after 
compaction). Please use caution opening a database with and without TTL, as it 
might corrupt the database. Please make sure to read the 
[constraints](https://github.com/facebook/rocksdb/wiki/Time-to-Live) before 
using.|
+|job.logged.store.base.dir|_user.dir_ environment property if set, else 
current working directory of the process|The base directory for changelog 
stores used by Samza application. Another way to configure the base directory 
is by setting environment variable `LOGGED_STORE_BASE_DIR`. __Note:__ The 
environment variable takes precedence over `job.logged.store.base.dir`. <br>By 
opting in, users are responsible for cleaning up the store directories if 
necessary. Jobs using host affinity shoul [...]
+|job.non-logged.store.base.dir|_user.dir_ environment property if set, else 
current working directory of the process|The base directory for non-changelog 
stores used by Samza application. <br>In YARN, the default behaviour without 
the configuration is to create non-changelog store directories in CWD which 
happens to be the YARN container directory. This gets cleaned up periodically 
as part of NodeManager's deletion service, which is controlled by the YARN 
config `yarn.nodemanager.delete. [...]
 
 ##### <a name="advanced-storage-configurations"></a>[4.1 Advanced Storage 
Configurations](#advanced-storage-configurations)
 |Name|Default|Description|
 |--- |--- |--- |
-|job.logged.store.base.dir|_user.dir_ environment property if set, else 
current working directory of the process|The base directory for changelog 
stores used by Samza application. Another way to configure the base directory 
is by setting environment variable `LOGGED_STORE_BASE_DIR`. __Note:__ The 
environment variable takes precedence over `job.logged.store.base.dir`. <br>By 
opting in, users are responsible for cleaning up the store directories if 
necessary. Jobs using host affinity shoul [...]
-|job.non-logged.store.base.dir|_user.dir_ environment property if set, else 
current working directory of the process|The base directory for non-changelog 
stores used by Samza application. <br>In YARN, the default behaviour without 
the configuration is to create non-changelog store directories in CWD which 
happens to be the YARN container directory. This gets cleaned up periodically 
as part of NodeManager's deletion service, which is controlled by the YARN 
config `yarn.nodemanager.delete. [...]
 |stores.default.changelog.<br>replication.factor|2|This property defines the 
default number of replicas to use for the change log stream.|
 
|stores.**_store-name_**.changelog.<br>replication.factor|stores.default.changelog.<br>replication.factor|The
 property defines the number of replicas to use for the change log stream.|
 |stores.**_store-name_**.changelog.<br>kafka.topic-level-property| |The 
property allows you to specify topic level settings for the changelog topic to 
be created. For e.g., you can specify the clean up policy as 
"stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka 
documentation](http://kafka.apache.org/documentation.html#configuration) for 
more topic level configurations.|
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index d3fa6ec..a1a2f05 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -203,6 +203,12 @@ public class ContainerStorageManager {
     this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
     this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
 
+    if (loggedStoreBaseDirectory != null && 
loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
+      LOG.warn("Logged and non-logged store base directory are configured to 
same path: {}. It is recommended to configure"
+          + "them separately to ensure clean up of non-logged store data 
doesn't accidentally impact logged store data.",
+          loggedStoreBaseDirectory);
+    }
+
     // set the config
     this.config = config;
 
@@ -955,33 +961,35 @@ public class ContainerStorageManager {
     private void cleanBaseDirsAndReadOffsetFiles() {
       LOG.debug("Cleaning base directories for stores.");
 
-      taskStores.keySet().forEach(storeName -> {
-          File nonLoggedStorePartitionDir =
-              
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
-          LOG.info("Got non logged storage partition directory as " + 
nonLoggedStorePartitionDir.toPath().toString());
-
-          if (nonLoggedStorePartitionDir.exists()) {
-            LOG.info("Deleting non logged storage partition directory " + 
nonLoggedStorePartitionDir.toPath().toString());
-            FileUtil.rm(nonLoggedStorePartitionDir);
-          }
-
-          File loggedStorePartitionDir =
-              
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
-          LOG.info("Got logged storage partition directory as " + 
loggedStorePartitionDir.toPath().toString());
+      taskStores.forEach((storeName, storageEngine) -> {
+          if (!storageEngine.getStoreProperties().isLoggedStore()) {
+            File nonLoggedStorePartitionDir =
+                
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
+            LOG.info("Got non logged storage partition directory as " + 
nonLoggedStorePartitionDir.toPath().toString());
 
-          // Delete the logged store if it is not valid.
-          if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
-            LOG.info("Deleting logged storage partition directory " + 
loggedStorePartitionDir.toPath().toString());
-            FileUtil.rm(loggedStorePartitionDir);
+            if (nonLoggedStorePartitionDir.exists()) {
+              LOG.info("Deleting non logged storage partition directory " + 
nonLoggedStorePartitionDir.toPath().toString());
+              FileUtil.rm(nonLoggedStorePartitionDir);
+            }
           } else {
+            File loggedStorePartitionDir =
+                
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
+            LOG.info("Got logged storage partition directory as " + 
loggedStorePartitionDir.toPath().toString());
+
+            // Delete the logged store if it is not valid.
+            if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
+              LOG.info("Deleting logged storage partition directory " + 
loggedStorePartitionDir.toPath().toString());
+              FileUtil.rm(loggedStorePartitionDir);
+            } else {
 
-            SystemStreamPartition changelogSSP = new 
SystemStreamPartition(changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition());
-            Map<SystemStreamPartition, String> offset =
-                StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, 
Collections.singleton(changelogSSP), false);
-            LOG.info("Read offset {} for the store {} from logged storage 
partition directory {}", offset, storeName, loggedStorePartitionDir);
+              SystemStreamPartition changelogSSP = new 
SystemStreamPartition(changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition());
+              Map<SystemStreamPartition, String> offset =
+                  StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, 
Collections.singleton(changelogSSP), false);
+              LOG.info("Read offset {} for the store {} from logged storage 
partition directory {}", offset, storeName, loggedStorePartitionDir);
 
-            if (offset.containsKey(changelogSSP)) {
-              fileOffsets.put(changelogSSP, offset.get(changelogSSP));
+              if (offset.containsKey(changelogSSP)) {
+                fileOffsets.put(changelogSSP, offset.get(changelogSSP));
+              }
             }
           }
         });

Reply via email to