[ 
https://issues.apache.org/jira/browse/SAMZA-1990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16913370#comment-16913370
 ] 

Aditya commented on SAMZA-1990:
-------------------------------

Unfortunately, this is not completely resolved. The stream which is both the 
input and output is being considered as an intermediate stream during planning 
phase and there is an attempt to create the stream while the stream already 
exists. We did not run into this issue with the UT mentioned in this ticket as 
it uses SystemAdmin (SimpleSystemAdmin used for tests) which always returns 
true for createStream(). Changing that code to throw an exception results in 
the following stack trace.

Caused by: org.apache.samza.SamzaExceptionCaused by: 
org.apache.samza.SamzaException at 
org.apache.samza.sql.system.SimpleSystemAdmin.createStream(SimpleSystemAdmin.java:68)
 at 
org.apache.samza.execution.StreamManager.createStreams(StreamManager.java:72) 
at 
org.apache.samza.execution.LocalJobPlanner.createStreams(LocalJobPlanner.java:139)
 at 
org.apache.samza.execution.LocalJobPlanner.prepareJobs(LocalJobPlanner.java:109)
 at 
org.apache.samza.runtime.LocalApplicationRunner.run(LocalApplicationRunner.java:167)

This needs to be fixed.

cc: [~dnishimura], [~pmaheshwari], [~psrinivasulu]

> Samza framework should let using the same system stream as both input and 
> output.
> ---------------------------------------------------------------------------------
>
>                 Key: SAMZA-1990
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1990
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Aditya
>            Assignee: Daniel Nishimura
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Consider the following scenario: A StreamApplication containing two pipelines 
> with the output StreamDescriptor of the first pipeline being the input 
> StreamDescriptor for the second pipeline. With Samza 1.0 changes, this 
> results in the "Serde for streamId: testavro-SIMPLE1 is already defined. 
> Cannot change it to different serdes." exception. This is an important 
> use-case for Samza-sql. We need to fix this.
> We have an end-to-end test case which is disabled because of this issue: 
> testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput
> https://jarvis.corp.linkedin.com/codesearch/result/?name=TestSamzaSqlEndToEnd.java&path=samza-li%2Fsamza%2Fsamza-test%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fsamza%2Ftest%2Fsamzasql&reponame=samza%2Fsamza-li#181
> org.apache.samza.config.ConfigException: Could not load ApplicationRunner 
> class org.apache.samza.runtime.LocalApplicationRunner
>  
> at 
> org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:74)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:73)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:69)
> at 
> org.apache.samza.test.samzasql.TestSamzaSqlEndToEnd.testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput(TestSamzaSqlEndToEnd.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at 
> org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:68)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: Serde for streamId: 
> testavro-SIMPLE1 is already defined. Cannot change it to different serdes.
> at 
> org.apache.samza.application.descriptors.ApplicationDescriptorImpl.getOrCreateStreamSerdes(ApplicationDescriptorImpl.java:315)
> at 
> org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.getInputStream(StreamApplicationDescriptorImpl.java:97)
> at 
> org.apache.samza.sql.translator.ScanTranslator.lambda$translate$0(ScanTranslator.java:112)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1118)
> at 
> org.apache.samza.sql.translator.ScanTranslator.translate(ScanTranslator.java:112)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:154)
> at org.apache.calcite.rel.core.TableScan.accept(TableScan.java:167)
> at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)
> at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:99)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:167)
> at 
> org.apache.calcite.rel.logical.LogicalProject.accept(LogicalProject.java:129)
> at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)
> at org.apache.calcite.rel.RelShuttleImpl.visitChildren(RelShuttleImpl.java:69)
> at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:131)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:146)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:137)
> at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:279)
> at 
> org.apache.samza.sql.translator.QueryTranslator.translate(QueryTranslator.java:130)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:74)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:42)
> at 
> org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.<init>(StreamApplicationDescriptorImpl.java:81)
> at 
> org.apache.samza.application.descriptors.ApplicationDescriptorUtil.getAppDescriptor(ApplicationDescriptorUtil.java:45)
> at 
> org.apache.samza.runtime.LocalApplicationRunner.<init>(LocalApplicationRunner.java:74)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to