[ 
https://issues.apache.org/jira/browse/SAMZA-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prateek Maheshwari updated SAMZA-1199:
--------------------------------------
    Fix Version/s:     (was: 0.13.0)
                   0.14.0

> ClassCastException in partitionBy if input type M does not equal msg.serde 
> type
> -------------------------------------------------------------------------------
>
>                 Key: SAMZA-1199
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1199
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.13.0
>            Reporter: Prateek Maheshwari
>            Assignee: Prateek Maheshwari
>             Fix For: 0.14.0
>
>
> {code}
> 2017-04-09 19:05:53.269 [main] SamzaContainer [ERROR] Caught exception/error 
> in process loop.
> org.apache.samza.SamzaException: java.lang.ClassCastException: 
> org.apache.avro.generic.GenericData$Record cannot be cast to 
> com.company.events.PageViewEvent
>       at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
>       at 
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:667)
>       at 
> org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:81)
>       at 
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:125)
> Caused by: java.lang.ClassCastException: 
> org.apache.avro.generic.GenericData$Record cannot be cast to 
> com.linkedin.events.PageViewEvent
>       at 
> org.apache.samza.operators.spec.OperatorSpecs$1$1.<init>(OperatorSpecs.java:62)
>       at 
> org.apache.samza.operators.spec.OperatorSpecs$1.apply(OperatorSpecs.java:60)
>       at 
> org.apache.samza.operators.impl.StreamOperatorImpl.onNext(StreamOperatorImpl.java:47)
>       at 
> org.apache.samza.operators.impl.OperatorImpl.lambda$propagateResult$1(OperatorImpl.java:86)
>       at java.lang.Iterable.forEach(Iterable.java:75)
>       at 
> org.apache.samza.operators.impl.OperatorImpl.propagateResult(OperatorImpl.java:86)
>       at 
> org.apache.samza.operators.impl.RootOperatorImpl.onNext(RootOperatorImpl.java:33)
>       at 
> org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:122)
>       at 
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
>       at 
> org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
>       at 
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:162)
>       at 
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
>       at 
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:160)
>       at 
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:428)
>       at 
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:373)
>       at 
> org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:314)
>       at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:228)
>       at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:157)
>       ... 3 more
> {code}
> For example, code similar to the following will compile correctly but fail at 
> runtime:
> {code}
>   public class TestStreamApp implements StreamApplication {

>     @Override

>     public void init(StreamGraph streamGraph, Config config) {

>       MessageStream<PageViewEvent> pageViewEvents =
>           streamGraph.getInputStream("PageViewEvent",

>             (k, m) -> AvroUtils.genericRecordToSpecificRecord(new 
> PageViewEvent(), (GenericData.Record) m));


>       
>       pageViewEvents
>           .
partitionBy(this::getMemberId)
>           .map(this::setTrackingCode) // throws ClassCastException since 
> incoming GenericData.Record cannot be cast to PageViewEvent
>           .sendTo(streamGraph.getOutputStream("OutputEvent", m -> 
> String.valueOf(getMemberId(m)), m -> m));
> 
}


>     private String getMemberId(PageViewEvent pve) {

>       return pve.memberId.toString();

>     }
>     private PageViewEvent setTrackingCode(PageViewEvent pve) {

>       pve.trackingCode = "1";

>       return pve;

>     }

>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to