When I change the code to the following I still get the same exception,
Success cannot be cast to Message.
val (actorRef, futureThreadedResult) = Source.actorRef[Message](1000,
OverflowStrategy.fail)
//XXX debug to remove
.map{text => println(" Flow Pre Parse: " + text); text}
// add our custom flow
.via(parseFlow)
//XXX debug to remove
.map{text => println("Flow Post Parse: " + text); text}
// this takes the output of our threadSink and materializes it as a
Future[Result]
.toMat(threadSink)(Keep.both).run()
futureThreadedResult.onComplete{ maybeResult =>
val string = maybeResult.get.texts.mkString(" , ")
println("\n\n\nfin: " + string)
as.shutdown()
}
The output is the same (and can be provided) I can also provide the code
for this revision as well.
Thanks,
--Brandon
On Tuesday, June 30, 2015 at 4:27:11 AM UTC-4, Patrik Nordwall wrote:
>
> What happens if you use
>
> val (actorRef, futureThreadedResult) = Source.actorRef[Message](1000,
> OverflowStrategy.fail)...
>
> instead of the
>
> val (actorRef, publisherActor) = Source.actorRef[Message](1000,
> OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
> val sourceActor = Source(publisherActor)
>
> /Patrik
>
>
> On Mon, Jun 29, 2015 at 10:53 PM, Brandon Gauthier <[email protected]
> <javascript:>> wrote:
>
>> Hello,
>>
>> I'm using Source.actorRef[...] for a unknown size but bounded stream (I
>> don't know how many elements but I do know it will end). The high level
>> outline of the stream is below and sample code can be found in this gist
>> <https://gist.github.com/gauthierbl/9cf40135619f4c99c102>.
>> Client1->ActorRef/Source[Message]->parseFlow[Message, String]->threadSink
>> [String]->"materialize[Result]"->Client2
>>
>> My goal is to have the stream be populated via messages to an actor and
>> then when the stream is "done" have a way to materialize a result to a
>> different part of the application. Input is one client's concern, output is
>> another client's concern.
>>
>> Per the documention
>> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-integrations.html>
>> for
>> Source.actorRef[...]:
>>
>>> The stream can be completed successfully by sending
>>> akka.actor.PoisonPill or akka.actor.Status.Success to the actor
>>> reference.
>>>
>>> The stream can be completed with failure by sending
>>> akka.actor.Status.Failure to the actor reference.
>>>
>>
>> In both cases it pretty clearly states that akka.actor.Status.Success
>> will end the stream with success. However when I send in Status.Success I
>> get a ClassCastException from the parseFlow. The parsFlow is expecting a
>> Message but its getting a Status.Success.
>>
>> I also get ClassCastException for Status.Failure. The PoisonPill stops
>> the stream immediately and no results are gathered.
>>
>> I'm not sure why this exception is being thrown. I'm very new to
>> akka-streams so I can't tell if is an issue with my usage or I
>> am misinterpreting the documentation. I would appreciate any help.
>>
>> Exception and program output is below, code is here
>> <https://gist.github.com/gauthierbl/9cf40135619f4c99c102>.
>> Flow Pre Parse: Message(hello)
>> Flow Pre Parse: Message(world)
>> Flow Post Parse: HELLO
>> Flow Pre Parse: Message(this)
>> Flow Post Parse: WORLD
>> Flow Pre Parse: Message(is)
>> Flow Post Parse: THIS
>> Flow Pre Parse: Message(a)
>> Flow Post Parse: IS
>> Sink: HELLO
>> Flow Pre Parse: Message(test)
>> Sink: HELLO , WORLD
>> Flow Post Parse: A
>> Sink: HELLO , WORLD , THIS
>> Flow Post Parse: TEST
>> Sink: HELLO , WORLD , THIS , IS
>> Sink: HELLO , WORLD , THIS , IS , A
>> Sink: HELLO , WORLD , THIS , IS , A , TEST
>> [ERROR] [06/29/2015 16:17:49.101]
>> [default-akka.actor.default-dispatcher-9] [akka.dispatch.Dispatcher]
>> akka.actor.Status$Success$ cannot be cast to com.example.Test2$Message
>> java.lang.ClassCastException: akka.actor.Status$Success$ cannot be cast
>> to com.example.Test2$Message
>> at com.example.Test2$$anonfun$3.apply(Test2.scala:39)
>> at akka.stream.impl.fusing.Map.onPush(Ops.scala:23)
>> at akka.stream.impl.fusing.Map.onPush(Ops.scala:22)
>> at
>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
>> at
>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
>> at
>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
>> at
>> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
>> at
>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
>> at
>> akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
>> at akka.stream.stage.AbstractStage.enterAndPull(Stage.scala:74)
>> at
>> akka.stream.impl.fusing.ActorOutputBoundary.akka$stream$impl$fusing$ActorOutputBoundary$$tryPutBallIn(ActorInterpreter.scala:217)
>> at
>> akka.stream.impl.fusing.ActorOutputBoundary$$anonfun$downstreamRunning$1.applyOrElse(ActorInterpreter.scala:305)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
>> at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at
>> akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Thanks for your help,
>> --Brandon
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> - Reactive apps on the JVM
> Twitter: @patriknw
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.