Repository: samza Updated Branches: refs/heads/master e312bb552 -> 07199cb07
Fix side input stores to use logged store directory Side input stores are non-changelog stores that still needs to use logged stored directory to guarantee durability. Author: bharathkk <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #701 from bharathkk/side-input-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/07199cb0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/07199cb0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/07199cb0 Branch: refs/heads/master Commit: 07199cb074db3e2773940e58887efb40d0771278 Parents: e312bb5 Author: bharathkk <[email protected]> Authored: Tue Oct 9 13:05:43 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Tue Oct 9 13:05:43 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/samza/container/SamzaContainer.scala | 4 +++- .../apache/samza/test/table/TestLocalTableWithSideInputs.java | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 5c4723b..3c10aae 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -553,7 +553,9 @@ object SamzaContainer extends Logging { case _ => null } - val storeDir = if (changeLogSystemStreamPartition != null) { + // We use the logged storage base directory for change logged and side input stores since side input stores + // dont have changelog configured. + val storeDir = if (changeLogSystemStreamPartition != null || sideInputStoresToSystemStreams.contains(storeName)) { TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) } else { TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName) http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index c1657ff..c31052d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -21,6 +21,7 @@ package org.apache.samza.test.table; import com.google.common.collect.ImmutableList; +import java.nio.file.FileSystems; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; @@ -30,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.operators.KV; @@ -82,6 +84,11 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness Map<String, String> configs = new HashMap<>(); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); + configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(), + FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString()); + // SideInput Tables needs this to be configured for persisting data + configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), + FileSystems.getDefault().getPath("logged").toAbsolutePath().toString()); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
