Repository: samza Updated Branches: refs/heads/master cc490ea89 -> 5c5afb82a
SAMZA-1771: Test Framework support for stateful testing and assertions over state - Gives StateAssert for stateful assertions - Gives examples of Testing stateful jobs for TaskApplication and StreamApplication Author: Sanil15 <[email protected]> Author: Sanil Jain <[email protected]> Author: Sanil Jain <[email protected]> Reviewers: Shanthoosh Venkataraman <[email protected]>, Prateek Maheshwari <[email protected]> Closes #683 from Sanil15/SAMZA-1771 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5c5afb82 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5c5afb82 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5c5afb82 Branch: refs/heads/master Commit: 5c5afb82ab12c72a4b114b0858f77779285b86cf Parents: cc490ea Author: Sanil15 <[email protected]> Authored: Thu Oct 11 16:13:45 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Oct 11 16:13:45 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/test/framework/TestRunner.java | 45 ++++++++-- .../system/InMemorySystemDescriptor.java | 1 - .../AsyncStreamTaskIntegrationTest.java | 3 +- .../StreamApplicationIntegrationTest.java | 61 ++++++++++++- .../framework/StreamTaskIntegrationTest.java | 93 ++++++++++++++++++++ .../table/PageViewToProfileJoinFunction.java | 2 +- .../table/TestLocalTableWithSideInputs.java | 20 ++--- 7 files changed, 198 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index add3bf6..a3d8a0e 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -20,6 +20,7 @@ package org.apache.samza.test.framework; import com.google.common.base.Preconditions; +import java.io.File; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; @@ -29,7 +30,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.samza.SamzaException; import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.application.SamzaApplication; @@ -61,7 +61,10 @@ import org.apache.samza.task.StreamTask; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; import org.apache.samza.test.framework.system.InMemorySystemDescriptor; -import org.junit.Assert; +import org.apache.samza.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * TestRunner provides APIs to set up integration tests for a Samza application. @@ -80,6 +83,7 @@ import org.junit.Assert; * */ public class TestRunner { + private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class); private static final String JOB_DEFAULT_SYSTEM = "default-samza-system"; private static final String JOB_NAME = "samza-test"; @@ -98,8 +102,14 @@ public class TestRunner { configs.put(JobConfig.PROCESSOR_ID(), "1"); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + // Changing the base directory for non-changelog stores used by Samza application to separate the + // on-disk store locations for concurrently executing tests + configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(), + new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope).getAbsolutePath()); + configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), + new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope).getAbsolutePath()); addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); - // This is important because Table Api enables host affinity by default for RocksDb + // Disabling host affinity since it requires reading locality information from a Kafka coordinator stream addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig()); @@ -251,16 +261,19 @@ public class TestRunner { * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode */ public void run(Duration timeout) { - Preconditions.checkState(app != null, - "TestRunner should run for Low Level Task api or High Level Application Api"); + Preconditions.checkNotNull(app); Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive"); + // Cleaning store directories to ensure current run does not pick up state from previous run + deleteStoreDirectories(); final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); runner.run(); - boolean timedOut = !runner.waitForFinish(timeout); - Assert.assertFalse("Timed out waiting for application to finish", timedOut); + if (!runner.waitForFinish(timeout)) { + throw new SamzaException("Timed out waiting for application to finish"); + } ApplicationStatus status = runner.status(); + deleteStoreDirectories(); if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) { - throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable())); + throw new SamzaException("Application could not finish successfully", status.getThrowable()); } } @@ -369,4 +382,20 @@ public class TestRunner { new EndOfStreamMessage(null))); }); } + + private void deleteStoreDirectories() { + Preconditions.checkNotNull(configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR())); + Preconditions.checkNotNull(configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR())); + deleteDirectory(configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR())); + deleteDirectory(configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR())); + } + + private void deleteDirectory(String path) { + File dir = new File(path); + LOG.info("Deleting the directory " + path); + FileUtil.rm(dir); + if (dir.exists()) { + LOG.warn("Could not delete the directory " + path); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java index e6e423f..77948f6 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java @@ -30,7 +30,6 @@ import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.inmemory.InMemorySystemFactory; import org.apache.samza.config.JavaSystemConfig; - /** * A descriptor for InMemorySystem. * System properties configured using a descriptor override corresponding properties provided in configuration. http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index f1757ab..7696b62 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.samza.SamzaException; import org.apache.samza.operators.KV; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; @@ -145,7 +146,7 @@ public class AsyncStreamTaskIntegrationTest { /** * Job should fail because it times out too soon */ - @Test(expected = AssertionError.class) + @Test(expected = SamzaException.class) public void testSamzaJobTimeoutFailureForAsyncTask() { InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test"); http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 7b9bad7..1dda302 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -20,6 +20,7 @@ package org.apache.samza.test.framework; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import org.apache.samza.SamzaException; @@ -27,27 +28,60 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.storage.kv.RocksDbTableDescriptor; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.table.Table; import org.apache.samza.test.controlmessages.TestData; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; import org.apache.samza.test.framework.system.InMemorySystemDescriptor; +import org.apache.samza.test.table.PageViewToProfileJoinFunction; +import org.apache.samza.test.table.TestTableData; import org.junit.Assert; import org.junit.Test; - -import static org.apache.samza.test.controlmessages.TestData.PageView; +import static org.apache.samza.test.controlmessages.TestData.*; public class StreamApplicationIntegrationTest { + private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"}; @Test + public void testStatefulJoinWithLocalTable() { + List<TestTableData.PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10)); + List<TestTableData.Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10)); + + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd + .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>()); + + InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd + .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>()) + .withBootstrap(true); + + InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd + .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>()); + + TestRunner + .of(new PageViewProfileViewJoinApplication()) + .addInputStream(pageViewStreamDesc, pageViews) + .addInputStream(profileStreamDesc, profiles) + .addOutputStream(outputStreamDesc, 1) + .run(Duration.ofSeconds(2)); + + Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size()); + } + + @Test public void testHighLevelApi() throws Exception { Random random = new Random(); int count = 10; @@ -92,6 +126,29 @@ public class StreamApplicationIntegrationTest { .run(Duration.ofMillis(1000)); } + private static class PageViewProfileViewJoinApplication implements StreamApplication { + @Override + public void describe(StreamApplicationDescriptor appDesc) { + Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable( + new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store", + KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde()))); + + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); + KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + appDesc.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table); + + KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD = + ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>()); + OutputStream<TestTableData.EnrichedPageView> outputStream = appDesc.getOutputStream(enrichedPageViewOSD); + appDesc.getInputStream(pageViewISD) + .partitionBy(TestTableData.PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>( + TestTableData.PageView.class)), "p1") + .join(table, new PageViewToProfileJoinFunction()) + .sendTo(outputStream); + } + } + private static class PageViewFilterApplication implements StreamApplication { @Override public void describe(StreamApplicationDescriptor appDesc) { http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index 55021d3..f778704 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -27,19 +27,68 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.samza.SamzaException; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.TaskApplicationDescriptor; +import org.apache.samza.context.Context; import org.apache.samza.operators.KV; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; import org.apache.samza.test.framework.system.InMemorySystemDescriptor; +import org.apache.samza.test.table.TestTableData; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Assert; import org.junit.Test; +import static org.apache.samza.test.table.TestTableData.Profile; +import static org.apache.samza.test.table.TestTableData.PageView; +import static org.apache.samza.test.table.TestTableData.EnrichedPageView; public class StreamTaskIntegrationTest { @Test + public void testStatefulTaskWithLocalTable() { + List<PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10)); + List<Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10)); + + InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); + + InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd + .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>()); + + InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd + .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>()) + .withBootstrap(true); + + InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd + .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>()); + + TestRunner + .of(new JoinTaskApplication()) + .addInputStream(pageViewStreamDesc, pageViews) + .addInputStream(profileStreamDesc, profiles) + .addOutputStream(outputStreamDesc, 1) + .run(Duration.ofSeconds(2)); + + Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size()); + } + + @Test public void testSyncTaskWithSinglePartition() throws Exception { List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50); @@ -155,6 +204,49 @@ public class StreamTaskIntegrationTest { StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); } + static public class JoinTaskApplication implements TaskApplication { + @Override + public void describe(TaskApplicationDescriptor appDesc) { + appDesc.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask()); + appDesc.addTable(new InMemoryTableDescriptor("profile-view-store", + KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde()))); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); + KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + appDesc.addInputStream(profileISD); + KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + appDesc.addInputStream(pageViewISD); + KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD = + ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>()); + appDesc.addOutputStream(enrichedPageViewOSD); + } + } + + static public class StatefulStreamTask implements StreamTask, InitableTask { + private ReadWriteTable<Integer, Profile> profileViewTable; + + @Override + public void init(Context context) throws Exception { + profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store"); + } + + @Override + public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { + if (message.getMessage() instanceof Profile) { + Profile profile = (Profile) message.getMessage(); + profileViewTable.put(profile.getMemberId(), profile); + } else if (message.getMessage() instanceof PageView) { + PageView pageView = (PageView) message.getMessage(); + Profile profile = profileViewTable.get(pageView.getMemberId()); + if (profile != null) { + System.out.println("Joining Page View ArticleView by " + profile.getMemberId()); + collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "EnrichedPageView"), null, null, + new TestTableData.EnrichedPageView(pageView.getPageKey(), pageView.getMemberId(), profile.getCompany()))); + } + } + } + + } + public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) { List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5); List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList()); @@ -167,4 +259,5 @@ public class StreamTaskIntegrationTest { expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition)); } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java index d253284..6b7bded 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java @@ -27,7 +27,7 @@ import org.apache.samza.test.table.TestTableData.Profile; /** * A {@link StreamTableJoinFunction} used by unit tests in this package */ -class PageViewToProfileJoinFunction implements StreamTableJoinFunction +public class PageViewToProfileJoinFunction implements StreamTableJoinFunction <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> { @Override http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 2fa00fe..4410b87 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -20,10 +20,15 @@ package org.apache.samza.test.table; import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.operators.KV; @@ -43,14 +48,6 @@ import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.junit.Test; -import java.nio.file.FileSystems; -import java.time.Duration; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - import static org.apache.samza.test.table.TestTableData.EnrichedPageView; import static org.apache.samza.test.table.TestTableData.PageView; import static org.apache.samza.test.table.TestTableData.Profile; @@ -87,11 +84,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness Map<String, String> configs = new HashMap<>(); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); - configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(), - FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString()); - // SideInput Tables needs this to be configured for persisting data - configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), - FileSystems.getDefault().getPath("logged").toAbsolutePath().toString()); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
