[
https://issues.apache.org/jira/browse/SAMZA-1836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599224#comment-16599224
]
Yi Pan (Data Infrastructure) commented on SAMZA-1836:
-----------------------------------------------------
Chat with [~pmaheshwari] more and here is the issue:
# User code should still use KafkaSystemDescriptor as the test framework should
not require user code change in their application
# Test framework creates a set of configuration in the overriding scope and
ExecutionPlanner will use them to override the original configure when
generating JobConfigs
# StreamManager is created and started before the ExecutionPlanner, hence, does
not use the configuration that the test framework overrides, which causes this
test failure.
So, the right fix is to also apply the overriding configuration set by the test
framework to create and start the StreamManager before ExecutionPlanner
generates the JobConfigs.
> StreamApplicationIntegrationTest and TestLocalTableWithSideInput fails due to
> wrong system configured
> -----------------------------------------------------------------------------------------------------
>
> Key: SAMZA-1836
> URL: https://issues.apache.org/jira/browse/SAMZA-1836
> Project: Samza
> Issue Type: Improvement
> Reporter: Yi Pan (Data Infrastructure)
> Priority: Major
> Fix For: 1.0
>
>
> The integration tests StreamApplicationIntegrationTest and
> TestLocalTableWithSideInput are using the new test framework and supposed to
> use InMemorySystemFactory. The recent check-in with PR 603 uses
> KafkaSystemDescriptor in the test, which will cause the following failure, if
> the generated configure is actually used in AbstractApplicationRunner:
> {code}
> org.apache.samza.SamzaException: Failed to start application:
> org.apache.samza.test.framework.StreamApplicationIntegrationTest$$Lambda$2/1844169442@4b0b0854.
> at
> org.apache.samza.runtime.LocalApplicationRunner.run(LocalApplicationRunner.java:201)
> at org.apache.samza.test.framework.TestRunner.run(TestRunner.java:293)
> at
> org.apache.samza.test.framework.StreamApplicationIntegrationTest.testHighLevelApi(StreamApplicationIntegrationTest.java:94)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.apache.samza.SamzaException: No bootstrap servers defined in
> config for test.
> at
> org.apache.samza.config.KafkaProducerConfig$$anonfun$4.apply(KafkaConfig.scala:379)
> at
> org.apache.samza.config.KafkaProducerConfig$$anonfun$4.apply(KafkaConfig.scala:379)
> at scala.Option.getOrElse(Option.scala:121)
> at
> org.apache.samza.config.KafkaProducerConfig.<init>(KafkaConfig.scala:379)
> at
> org.apache.samza.config.KafkaConfig.getKafkaSystemProducerConfig(KafkaConfig.scala:316)
> at
> org.apache.samza.system.kafka.KafkaSystemFactory.getAdmin(KafkaSystemFactory.scala:108)
> at
> org.apache.samza.config.JavaSystemConfig.lambda$getSystemAdmins$1(JavaSystemConfig.java:83)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1691)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:81)
> at org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
> at
> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
> at
> org.apache.samza.runtime.AbstractApplicationRunner.buildAndStartStreamManager(AbstractApplicationRunner.java:130)
> at
> org.apache.samza.runtime.AbstractApplicationRunner.getExecutionPlan(AbstractApplicationRunner.java:98)
> at
> org.apache.samza.runtime.AbstractApplicationRunner.getExecutionPlan(AbstractApplicationRunner.java:63)
> at
> org.apache.samza.runtime.LocalApplicationRunner.run(LocalApplicationRunner.java:164)
> ... 24 more
> {code}
> The tests in master were passing due to the following double issues:
> # in AbstractApplicationRunner, the expanded systemStreamConfigs are not used
> to create the StreamManager:
> {code}
> Map<String, String> appConfigs = new HashMap<>(cfg);
> appConfigs.putAll(systemStreamConfigs);
> // create the physical execution plan
> Config generatedConfig = new MapConfig(cfg);
> StreamManager streamManager = buildAndStartStreamManager(generatedConfig);
> {code}
> # In integration tests, we used KafkaSystemDescriptor for the test system,
> which should be using InMemorySystemFactory. Example in
> StreamApplicationIntegrationTest:
> {code}
> final StreamApplication pageViewFilter = (streamGraph, cfg) -> {
> KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
> KafkaInputDescriptor<KV<String, PageView>> isd =
> ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new
> NoOpSerde<>()));
> MessageStream<KV<String, TestData.PageView>> inputStream =
> streamGraph.getInputStream(isd);
>
> inputStream.map(StreamApplicationIntegrationTest.Values.create()).filter(pv
> -> pv.getPageKey().equals("inbox"));
> };
> {code}
> The test failure was not triggered because it is masked by the first bug.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)