Repository: samza
Updated Branches:
  refs/heads/master e312bb552 -> 07199cb07


Fix side input stores to use logged store directory

Side input stores are non-changelog stores that still needs to use logged 
stored directory to guarantee durability.

Author: bharathkk <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #701 from bharathkk/side-input-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/07199cb0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/07199cb0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/07199cb0

Branch: refs/heads/master
Commit: 07199cb074db3e2773940e58887efb40d0771278
Parents: e312bb5
Author: bharathkk <[email protected]>
Authored: Tue Oct 9 13:05:43 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Tue Oct 9 13:05:43 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/container/SamzaContainer.scala     | 4 +++-
 .../apache/samza/test/table/TestLocalTableWithSideInputs.java | 7 +++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5c4723b..3c10aae 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -553,7 +553,9 @@ object SamzaContainer extends Logging {
               case _ => null
             }
 
-            val storeDir = if (changeLogSystemStreamPartition != null) {
+            // We use the logged storage base directory for change logged and 
side input stores since side input stores
+            // dont have changelog configured.
+            val storeDir = if (changeLogSystemStreamPartition != null || 
sideInputStoresToSystemStreams.contains(storeName)) {
               TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, 
storeName, taskName)
             } else {
               TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, 
storeName, taskName)

http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index c1657ff..c31052d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -21,6 +21,7 @@ package org.apache.samza.test.table;
 
 import com.google.common.collect.ImmutableList;
 
+import java.nio.file.FileSystems;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.KV;
@@ -82,6 +84,11 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
     Map<String, String> configs = new HashMap<>();
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
PAGEVIEW_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
PROFILE_STREAM), systemName);
+    configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
+        
FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString());
+    // SideInput Tables needs this to be configured for persisting data
+    configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
+        
FileSystems.getDefault().getPath("logged").toAbsolutePath().toString());
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
ENRICHED_PAGEVIEW_STREAM), systemName);
 
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);

Reply via email to