Repository: samza Updated Branches: refs/heads/master 334d24e68 -> 094ff1641
SAMZA-1852: Adding default job system in TestRunner, disabling host affinity to support TableDescriptors and refining addConfig method for TestRunner API - The default system is a required config for intermediate streams, and since no user will write assertions against them, defaulting it makes it easier for the user to write test - To support stateful jobs using Table API descriptors we need to disable host affinity, which is enabled by table API by default - vjagadish pointed out addConfigs vs addOverrideConfig to be a confusing user-facing API. We now support only addConfig with different signatures, this configs takes precedence over any descriptor or TestRunner generated configs Author: Sanil15 <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Yi Pan <[email protected]> Closes #651 from Sanil15/SAMZA-1852 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/094ff164 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/094ff164 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/094ff164 Branch: refs/heads/master Commit: 094ff1641c330e87f081d650b9066a0842f77963 Parents: 334d24e Author: Sanil15 <[email protected]> Authored: Mon Oct 1 15:11:54 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Mon Oct 1 15:11:54 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/test/framework/TestRunner.java | 64 ++++++++++---------- .../AsyncStreamTaskIntegrationTest.java | 2 +- .../StreamApplicationIntegrationTest.java | 23 ------- .../framework/StreamTaskIntegrationTest.java | 4 +- .../table/TestLocalTableWithSideInputs.java | 14 ++--- 5 files changed, 40 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index fe8581b..a1103dd 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -33,7 +33,9 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.samza.SamzaException; import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.application.SamzaApplication; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; @@ -74,11 +76,14 @@ import org.junit.Assert; * <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li> * <li>"job.name" = "test-samza"</li> * <li>"processor.id" = "1"</li> + * <li>"job.default.system" = {@code JOB_DEFAULT_SYSTEM}</li> + * <li>"job.host-affinity.enabled" = "false"</li> * </ol> * */ public class TestRunner { - public static final String JOB_NAME = "samza-test"; + private static final String JOB_DEFAULT_SYSTEM = "default-samza-system"; + private static final String JOB_NAME = "samza-test"; private Map<String, String> configs; private SamzaApplication app; @@ -96,6 +101,11 @@ public class TestRunner { configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); + // This is important because Table Api enables host affinity by default for RocksDb + addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); + addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); + addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig()); } /** @@ -142,13 +152,17 @@ public class TestRunner { } /** - * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. - * @param config configs for the application + * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs + * + * @param key of the config + * @param value of the config * @return this {@link TestRunner} */ - public TestRunner addConfigs(Map<String, String> config) { - Preconditions.checkNotNull(config); - config.forEach(this.configs::putIfAbsent); + public TestRunner addConfig(String key, String value) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); + configs.put(String.format("%s%s", configPrefix, key), value); return this; } @@ -157,24 +171,10 @@ public class TestRunner { * @param config configs for the application * @return this {@link TestRunner} */ - public TestRunner addConfigs(Map<String, String> config, String configPrefix) { + public TestRunner addConfig(Map<String, String> config) { Preconditions.checkNotNull(config); - config.forEach((key, value) -> this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value)); - return this; - } - - /** - * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already - * exisiting in {@code configs} - * @param key key of the config - * @param value value of the config - * @return this {@link TestRunner} - */ - public TestRunner addOverrideConfig(String key, String value) { - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(value); - String configKeyPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); - configs.put(String.format("%s%s", configKeyPrefix, key), value); + String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); + config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value)); return this; } @@ -202,7 +202,8 @@ public class TestRunner { } /** - * Adds the provided input stream with mock data to the test application. + * Adds the provided input stream with mock data to the test application. Default configs and user added configs have + * a higher precedence over system and stream descriptor generated configs. * @param descriptor describes the stream that is supposed to be input to Samza application * @param messages map whose key is partitionId and value is messages in the partition * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}. @@ -220,12 +221,13 @@ public class TestRunner { } /** - * Adds the provided output stream to the test application. + * Adds the provided output stream to the test application. Default configs and user added configs have a higher + * precedence over system and stream descriptor generated configs. * @param streamDescriptor describes the stream that is supposed to be output for the Samza application * @param partitionCount partition count of output stream * @return this {@link TestRunner} */ - public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) { + public TestRunner addOutputStream(InMemoryOutputDescriptor<?> streamDescriptor, int partitionCount) { Preconditions.checkNotNull(streamDescriptor); Preconditions.checkState(partitionCount >= 1); InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor(); @@ -238,8 +240,8 @@ public class TestRunner { factory .getAdmin(streamDescriptor.getSystemName(), config) .createStream(spec); - addConfigs(streamDescriptor.toConfig()); - addConfigs(streamDescriptor.getSystemDescriptor().toConfig()); + addConfig(streamDescriptor.toConfig()); + addConfig(streamDescriptor.getSystemDescriptor().toConfig()); return this; } @@ -340,7 +342,7 @@ public class TestRunner { * messages in the partition * @param descriptor describes a stream to initialize with the in memory system */ - private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor, + private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor, Map<Integer, Iterable<StreamMessageType>> partitonData) { String systemName = descriptor.getSystemName(); String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId()); @@ -352,8 +354,8 @@ public class TestRunner { } InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); imsd.withInMemoryScope(this.inMemoryScope); - addConfigs(descriptor.toConfig()); - addConfigs(descriptor.getSystemDescriptor().toConfig(), String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId())); + addConfig(descriptor.toConfig()); + addConfig(descriptor.getSystemDescriptor().toConfig()); StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size()); SystemFactory factory = new InMemorySystemFactory(); Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index ef9508a..f1757ab 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -123,7 +123,7 @@ public class AsyncStreamTaskIntegrationTest { .of(MyAsyncStreamTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) - .addOverrideConfig("task.max.concurrency", "4") + .addConfig("task.max.concurrency", "4") .run(Duration.ofSeconds(2)); StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 6dd9159..4ebe95f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -96,7 +96,6 @@ public class StreamApplicationIntegrationTest { .of(pageViewRepartition) .addInputStream(imid, pageviews) .addOutputStream(imod, 10) - .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1500)); Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1); @@ -109,27 +108,6 @@ public class StreamApplicationIntegrationTest { } /** - * Job should fail since it is missing config "job.default.system" for partitionBy Operator - */ - @Test(expected = SamzaException.class) - public void testSamzaJobStartMissingConfigFailureForStreamApplication() { - - InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); - - InMemoryInputDescriptor<PageView> imid = isd - .getInputDescriptor("PageView", new NoOpSerde<PageView>()); - - InMemoryOutputDescriptor<PageView> imod = isd - .getOutputDescriptor("Output", new NoOpSerde<PageView>()); - - TestRunner - .of(pageViewRepartition) - .addInputStream(imid, new ArrayList<>()) - .addOutputStream(imod, 10) - .run(Duration.ofMillis(1000)); - } - - /** * Null page key is passed in input data which should fail filter logic */ @Test(expected = SamzaException.class) @@ -154,7 +132,6 @@ public class StreamApplicationIntegrationTest { TestRunner.of(pageViewFilter) .addInputStream(imid, pageviews) .addOutputStream(imod, 10) - .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1000)); } http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index bc5cba7..55021d3 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -102,7 +102,7 @@ public class StreamTaskIntegrationTest { .of(MyStreamTestTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) - .addOverrideConfig("job.container.thread.pool.size", "4") + .addConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(1)); StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000)); @@ -149,7 +149,7 @@ public class StreamTaskIntegrationTest { .of(MyStreamTestTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) - .addOverrideConfig("job.container.thread.pool.size", "4") + .addConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(2)); StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 3c22818..814ad92 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -30,10 +30,8 @@ import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.IntegerSerde; @@ -64,7 +62,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness @Test public void testJoinWithSideInputsTable() { runTest( - "side-input-join", + "test", new PageViewProfileJoin(), Arrays.asList(TestTableData.generatePageViews(10)), Arrays.asList(TestTableData.generateProfiles(10))); @@ -73,7 +71,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness @Test public void testJoinWithDurableSideInputTable() { runTest( - "durable-side-input", + "test", new DurablePageViewProfileJoin(), Arrays.asList(TestTableData.generatePageViews(5)), Arrays.asList(TestTableData.generateProfiles(5))); @@ -85,7 +83,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); - configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName); InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName); @@ -103,8 +100,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness .addInputStream(pageViewStreamDesc, pageViews) .addInputStream(profileStreamDesc, profiles) .addOutputStream(outputStreamDesc, 1) - .addConfigs(new MapConfig(configs)) - .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()) + .addConfig(new MapConfig(configs)) .run(Duration.ofMillis(100000)); try { @@ -135,7 +131,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness public void describe(StreamApplicationDescriptor appDesc) { Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor()); KafkaSystemDescriptor sd = - new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM))); + new KafkaSystemDescriptor("test"); appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>())) .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") .join(table, new PageViewToProfileJoinFunction()) @@ -148,7 +144,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness .withSideInputsProcessor((msg, store) -> { Profile profile = (Profile) msg.getMessage(); int key = profile.getMemberId(); - return ImmutableList.of(new Entry<>(key, profile)); }); } @@ -162,7 +157,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness .withSideInputsProcessor((msg, store) -> { TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); int key = profile.getMemberId(); - return ImmutableList.of(new Entry<>(key, profile)); }); }
