Repository: samza Updated Branches: refs/heads/master 1c2d6effb -> 08496c96d
SAMZA-1967: Tests failing when Job uses any serde other than NoOp Context: Serde is configured in JobNodeConfigurationGenerator and any StreamDescriptor#toConfig does not generate key and msg serde configs Problem: Tests failing when Job uses any serde other than NoOp, since ApplicationDescriptor serdes take precedence in absence of any user-supplied configs Solution: Passing null msg and key serde configs in userConfigs for StreamDescriptors ensures ApplicationDescriptor generated serde configs don't take precedence prateekm rmatharu please take a look Author: Sanil15 <[email protected]> Reviewers: Prateek M<[email protected]> Closes #764 from Sanil15/SAMZA-1967 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/08496c96 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/08496c96 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/08496c96 Branch: refs/heads/master Commit: 08496c96d229582456d5fe48f442876745c11112 Parents: 1c2d6ef Author: Sanil15 <[email protected]> Authored: Fri Oct 26 17:51:58 2018 -0700 Committer: Jagadish <[email protected]> Committed: Fri Oct 26 17:51:58 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/test/framework/TestRunner.java | 23 ++++++++++- .../StreamApplicationIntegrationTest.java | 43 ++++++++++++++------ .../framework/StreamTaskIntegrationTest.java | 5 ++- 3 files changed, 54 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/08496c96/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 531b0ef..c80ce1b 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -39,6 +39,7 @@ import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; import org.apache.samza.job.ApplicationStatus; @@ -55,6 +56,7 @@ import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.descriptors.StreamDescriptor; import org.apache.samza.system.inmemory.InMemorySystemFactory; import org.apache.samza.task.AsyncStreamTask; import org.apache.samza.task.StreamTask; @@ -81,6 +83,7 @@ import org.slf4j.LoggerFactory; * <li>"job.host-affinity.enabled" = "false"</li> * </ol> * + * TestRunner only supports NoOpSerde i.e. inputs to Test Framework should be deserialized */ public class TestRunner { private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class); @@ -187,7 +190,7 @@ public class TestRunner { * Adds the provided input stream with mock data to the test application. * * @param descriptor describes the stream that is supposed to be input to Samza application - * @param messages messages used to initialize the single partition stream + * @param messages messages used to initialize the single partition stream. These message should always be deserialized * @param <StreamMessageType> a message with null key or a KV {@link org.apache.samza.operators.KV}. * key of KV represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or * {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message @@ -206,7 +209,8 @@ public class TestRunner { * Adds the provided input stream with mock data to the test application. Default configs and user added configs have * a higher precedence over system and stream descriptor generated configs. * @param descriptor describes the stream that is supposed to be input to Samza application - * @param messages map whose key is partitionId and value is messages in the partition + * @param messages map whose key is partitionId and value is messages in the partition. These message should always + * be deserialized * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}. * A key of which represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or * {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message @@ -243,6 +247,7 @@ public class TestRunner { .createStream(spec); addConfig(streamDescriptor.toConfig()); addConfig(streamDescriptor.getSystemDescriptor().toConfig()); + addSerdeConfigs(streamDescriptor); return this; } @@ -359,6 +364,7 @@ public class TestRunner { imsd.withInMemoryScope(this.inMemoryScope); addConfig(descriptor.toConfig()); addConfig(descriptor.getSystemDescriptor().toConfig()); + addSerdeConfigs(descriptor); StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitionData.size()); SystemFactory factory = new InMemorySystemFactory(); Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); @@ -391,4 +397,17 @@ public class TestRunner { LOG.warn("Could not delete the directory " + path); } } + + /** + * Test Framework only supports NoOpSerde. This method ensures null key and msg serde config for input and output streams + * takes preference when configs are merged in {@link org.apache.samza.execution.JobPlanner#getExecutionPlan} + * over {@link org.apache.samza.application.descriptors.ApplicationDescriptor} generated configs + */ + private void addSerdeConfigs(StreamDescriptor descriptor) { + String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), descriptor.getStreamId()); + String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE(); + String msgSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE(); + this.configs.put(keySerdeConfigKey, null); + this.configs.put(msgSerdeConfigKey, null); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/08496c96/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 476c0dc..b629317 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; @@ -33,6 +34,7 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; @@ -57,16 +59,23 @@ public class StreamApplicationIntegrationTest { @Test public void testStatefulJoinWithLocalTable() { - List<TestTableData.PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10)); - List<TestTableData.Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10)); + Random random = new Random(); + List<KV<String, TestTableData.PageView>> pageViews = Arrays.asList(TestTableData.generatePageViews(10)) + .stream() + .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x)) + .collect(Collectors.toList()); + List<KV<String, TestTableData.Profile>> profiles = Arrays.asList(TestTableData.generateProfiles(10)) + .stream() + .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x)) + .collect(Collectors.toList()); InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); - InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd - .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>()); + InMemoryInputDescriptor<KV<String, TestTableData.PageView>> pageViewStreamDesc = isd + .getInputDescriptor("PageView", new NoOpSerde<KV<String, TestTableData.PageView>>()); - InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd - .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>()) + InMemoryInputDescriptor<KV<String, TestTableData.Profile>> profileStreamDesc = isd + .getInputDescriptor("Profile", new NoOpSerde<KV<String, TestTableData.Profile>>()) .shouldBootstrap(); InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd @@ -127,6 +136,7 @@ public class StreamApplicationIntegrationTest { .run(Duration.ofMillis(1000)); } + private static class PageViewProfileViewJoinApplication implements StreamApplication { @Override public void describe(StreamApplicationDescriptor appDescriptor) { @@ -135,16 +145,23 @@ public class StreamApplicationIntegrationTest { KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde()))); KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); - KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - appDescriptor.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table); - KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD = + ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); + + appDescriptor + .getInputStream(profileISD) + .map(m -> new KV(m.getValue().getMemberId(), m.getValue())) + .sendTo(table); + + KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD = + ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD = - ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>()); + ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>()); + OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD); appDescriptor.getInputStream(pageViewISD) - .partitionBy(TestTableData.PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>( - TestTableData.PageView.class)), "p1") + .partitionBy(pv -> pv.getValue().getMemberId() , pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1") .join(table, new PageViewToProfileJoinFunction()) .sendTo(outputStream); } @@ -155,7 +172,7 @@ public class StreamApplicationIntegrationTest { public void describe(StreamApplicationDescriptor appDescriptor) { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); KafkaInputDescriptor<KV<String, PageView>> isd = - ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd); inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox")); } http://git-wip-us.apache.org/repos/asf/samza/blob/08496c96/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index 5fee762..2137a46 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.samza.context.Context; import org.apache.samza.application.descriptors.TaskApplicationDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor; @@ -209,8 +210,8 @@ public class StreamTaskIntegrationTest { @Override public void describe(TaskApplicationDescriptor appDescriptor) { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); - KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new JsonSerdeV2<>()); + KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new JsonSerdeV2<>()); KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD = ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>()); appDescriptor
