[ 
https://issues.apache.org/jira/browse/SAMZA-1990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Nishimura updated SAMZA-1990:
------------------------------------
    Description: 
Consider the following scenario: A StreamApplication containing two pipelines 
with the output StreamDescriptor of the first pipeline being the input 
StreamDescriptor for the second pipeline. With Samza 1.0 changes, this results 
in the "Serde for streamId: testavro-SIMPLE1 is already defined. Cannot change 
it to different serdes." exception. This is an important use-case for 
Samza-sql. We need to fix this.

We have an end-to-end test case which is disabled because of this issue: 
testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput

org.apache.samza.config.ConfigException: Could not load ApplicationRunner class 
org.apache.samza.runtime.LocalApplicationRunner

 

at 
org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:74)

at 
org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:73)

at 
org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:69)

at 
org.apache.samza.test.samzasql.TestSamzaSqlEndToEnd.testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput(TestSamzaSqlEndToEnd.java:142)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)

at org.junit.runners.ParentRunner.run(ParentRunner.java:363)

at org.junit.runner.JUnitCore.run(JUnitCore.java:137)

at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)

at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: java.lang.reflect.InvocationTargetException

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at 
org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:68)

... 27 more

Caused by: java.lang.IllegalArgumentException: Serde for streamId: 
testavro-SIMPLE1 is already defined. Cannot change it to different serdes.

at 
org.apache.samza.application.descriptors.ApplicationDescriptorImpl.getOrCreateStreamSerdes(ApplicationDescriptorImpl.java:315)

at 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.getInputStream(StreamApplicationDescriptorImpl.java:97)

at 
org.apache.samza.sql.translator.ScanTranslator.lambda$translate$0(ScanTranslator.java:112)

at java.util.HashMap.computeIfAbsent(HashMap.java:1118)

at 
org.apache.samza.sql.translator.ScanTranslator.translate(ScanTranslator.java:112)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:154)

at org.apache.calcite.rel.core.TableScan.accept(TableScan.java:167)

at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)

at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:99)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:167)

at org.apache.calcite.rel.logical.LogicalProject.accept(LogicalProject.java:129)

at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)

at org.apache.calcite.rel.RelShuttleImpl.visitChildren(RelShuttleImpl.java:69)

at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:131)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:146)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:137)

at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:279)

at 
org.apache.samza.sql.translator.QueryTranslator.translate(QueryTranslator.java:130)

at 
org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:74)

at 
org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:42)

at 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.<init>(StreamApplicationDescriptorImpl.java:81)

at 
org.apache.samza.application.descriptors.ApplicationDescriptorUtil.getAppDescriptor(ApplicationDescriptorUtil.java:45)

at 
org.apache.samza.runtime.LocalApplicationRunner.<init>(LocalApplicationRunner.java:74)

  was:
Consider the following scenario: A StreamApplication containing two pipelines 
with the output StreamDescriptor of the first pipeline being the input 
StreamDescriptor for the second pipeline. With Samza 1.0 changes, this results 
in the "Serde for streamId: testavro-SIMPLE1 is already defined. Cannot change 
it to different serdes." exception. This is an important use-case for 
Samza-sql. We need to fix this.

We have an end-to-end test case which is disabled because of this issue: 
testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput

https://jarvis.corp.linkedin.com/codesearch/result/?name=TestSamzaSqlEndToEnd.java&path=samza-li%2Fsamza%2Fsamza-test%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fsamza%2Ftest%2Fsamzasql&reponame=samza%2Fsamza-li#181

org.apache.samza.config.ConfigException: Could not load ApplicationRunner class 
org.apache.samza.runtime.LocalApplicationRunner

 

at 
org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:74)

at 
org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:73)

at 
org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:69)

at 
org.apache.samza.test.samzasql.TestSamzaSqlEndToEnd.testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput(TestSamzaSqlEndToEnd.java:142)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)

at org.junit.runners.ParentRunner.run(ParentRunner.java:363)

at org.junit.runner.JUnitCore.run(JUnitCore.java:137)

at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)

at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: java.lang.reflect.InvocationTargetException

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at 
org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:68)

... 27 more

Caused by: java.lang.IllegalArgumentException: Serde for streamId: 
testavro-SIMPLE1 is already defined. Cannot change it to different serdes.

at 
org.apache.samza.application.descriptors.ApplicationDescriptorImpl.getOrCreateStreamSerdes(ApplicationDescriptorImpl.java:315)

at 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.getInputStream(StreamApplicationDescriptorImpl.java:97)

at 
org.apache.samza.sql.translator.ScanTranslator.lambda$translate$0(ScanTranslator.java:112)

at java.util.HashMap.computeIfAbsent(HashMap.java:1118)

at 
org.apache.samza.sql.translator.ScanTranslator.translate(ScanTranslator.java:112)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:154)

at org.apache.calcite.rel.core.TableScan.accept(TableScan.java:167)

at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)

at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:99)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:167)

at org.apache.calcite.rel.logical.LogicalProject.accept(LogicalProject.java:129)

at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)

at org.apache.calcite.rel.RelShuttleImpl.visitChildren(RelShuttleImpl.java:69)

at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:131)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:146)

at 
org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:137)

at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:279)

at 
org.apache.samza.sql.translator.QueryTranslator.translate(QueryTranslator.java:130)

at 
org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:74)

at 
org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:42)

at 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.<init>(StreamApplicationDescriptorImpl.java:81)

at 
org.apache.samza.application.descriptors.ApplicationDescriptorUtil.getAppDescriptor(ApplicationDescriptorUtil.java:45)

at 
org.apache.samza.runtime.LocalApplicationRunner.<init>(LocalApplicationRunner.java:74)


> Samza framework should let using the same system stream as both input and 
> output.
> ---------------------------------------------------------------------------------
>
>                 Key: SAMZA-1990
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1990
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Aditya
>            Assignee: Daniel Nishimura
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Consider the following scenario: A StreamApplication containing two pipelines 
> with the output StreamDescriptor of the first pipeline being the input 
> StreamDescriptor for the second pipeline. With Samza 1.0 changes, this 
> results in the "Serde for streamId: testavro-SIMPLE1 is already defined. 
> Cannot change it to different serdes." exception. This is an important 
> use-case for Samza-sql. We need to fix this.
> We have an end-to-end test case which is disabled because of this issue: 
> testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput
> org.apache.samza.config.ConfigException: Could not load ApplicationRunner 
> class org.apache.samza.runtime.LocalApplicationRunner
>  
> at 
> org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:74)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:73)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplicationRunner.<init>(SamzaSqlApplicationRunner.java:69)
> at 
> org.apache.samza.test.samzasql.TestSamzaSqlEndToEnd.testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput(TestSamzaSqlEndToEnd.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at 
> org.apache.samza.runtime.ApplicationRunners.getApplicationRunner(ApplicationRunners.java:68)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: Serde for streamId: 
> testavro-SIMPLE1 is already defined. Cannot change it to different serdes.
> at 
> org.apache.samza.application.descriptors.ApplicationDescriptorImpl.getOrCreateStreamSerdes(ApplicationDescriptorImpl.java:315)
> at 
> org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.getInputStream(StreamApplicationDescriptorImpl.java:97)
> at 
> org.apache.samza.sql.translator.ScanTranslator.lambda$translate$0(ScanTranslator.java:112)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1118)
> at 
> org.apache.samza.sql.translator.ScanTranslator.translate(ScanTranslator.java:112)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:154)
> at org.apache.calcite.rel.core.TableScan.accept(TableScan.java:167)
> at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)
> at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:99)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:167)
> at 
> org.apache.calcite.rel.logical.LogicalProject.accept(LogicalProject.java:129)
> at org.apache.calcite.rel.RelShuttleImpl.visitChild(RelShuttleImpl.java:55)
> at org.apache.calcite.rel.RelShuttleImpl.visitChildren(RelShuttleImpl.java:69)
> at org.apache.calcite.rel.RelShuttleImpl.visit(RelShuttleImpl.java:131)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:146)
> at 
> org.apache.samza.sql.translator.QueryTranslator$1.visit(QueryTranslator.java:137)
> at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:279)
> at 
> org.apache.samza.sql.translator.QueryTranslator.translate(QueryTranslator.java:130)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:74)
> at 
> org.apache.samza.sql.runner.SamzaSqlApplication.describe(SamzaSqlApplication.java:42)
> at 
> org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl.<init>(StreamApplicationDescriptorImpl.java:81)
> at 
> org.apache.samza.application.descriptors.ApplicationDescriptorUtil.getAppDescriptor(ApplicationDescriptorUtil.java:45)
> at 
> org.apache.samza.runtime.LocalApplicationRunner.<init>(LocalApplicationRunner.java:74)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to