much appreciated

On Friday, 13 June 2014 01:27:11 UTC+8, Ian Hummel wrote:
>
> So it's been almost 1 year, but funny enough I had to go back and work on 
> this code a little more and finally found an answer.  Updating the thread 
> in the hopes that this is useful for future Google searches...
>
> Whenever you create an Actor that extends akka.camel.Consumer, it 
> registers itself (including the URI it wants to listen on) with 
> the ConsumerRegistrar.  This class hooks into the Camel Context and adds a 
> route from the Camel side of things to the actor.  The route gets created 
> by ConsumerActorRouteBuilder, which applies any route customizations via 
> the following code:
>
>   def configure(): Unit =
>     applyUserRouteCustomization(
>       settings.Conversions.apply(
>         endpointUri take endpointUri.indexOf(":"), // e.g. "http" from "
> http://whatever/...";
>         
> from(endpointUri).routeId(consumer.path.toString))).to(targetActorUri)
>
> settings.Converisons.apply will end up calling (in Camel.scala)
>
> conversions.get(s).fold(r)(r.convertBodyTo)
>
> So basically it reads any "statically" configured type conversions from 
> the settings file and hardcodes a conversion BEFORE your actor will receive 
> the CamelMessage.
>
> The issue I was running into was the default reference.conf file includes 
> this snippet:
>
> akka {
>   camel {
>     ...
>     conversions {
>       "file" = "java.io.InputStream"
>     }
>   }
> }
>
> Effectively this means that the message body has been converted to a 
> java.io.InputStream before you even have a crack at it.  You can't recover 
> the java.io.File from the java.io.InputStream.  The solution is to just 
> override that key, using some dummy conversion like:
>
>   val confString = """
>     akka.camel.streamingCache = false
>     akka.camel.conversions {
>       "file" = "java.io.File"
>     }
>   """
>   val conf = ConfigFactory.parseString(confString)
>
> Then your actors can do this:
>
>   def receive = {
>     case msg: CamelMessage ⇒ {
>       val file = msg.bodyAs[File]
>     }
>   }
>
> And everything just works.
>
>
> Cheers!
>
>
> On Wednesday, August 28, 2013 6:55:27 PM UTC-4, Ian Hummel wrote:
>>
>> Hello everyone,
>>
>> I'm using the file component in my Consumer actor and am trying to get 
>> the message body as a java.io.File object.  Reading 
>> http://people.apache.org/~dkulp/camel/file2.html leads me to believe 
>> that should be as simple as:
>>
>> import akka.actor.ActorSystem
>> import akka.actor.Props
>> import akka.actor.ActorLogging
>> import akka.camel.Consumer
>> import akka.camel.CamelMessage
>> import java.io.File
>>
>> class ExampleActor extends Consumer with ActorLogging {
>> def endpointUri = "file:///tmp/inbox?noop=true"
>>
>> def receive = {
>> case msg: CamelMessage => {
>> log.info("Received message {}", msg.bodyAs[File])
>> }
>> }
>> }
>>
>> object ExampleActor extends App {
>> val system = ActorSystem()
>> system.actorOf(Props[ExampleActor])
>> system.awaitTermination
>> }
>>
>> But whenever I do that I get the following exception:
>>
>> [ERROR] [08/28/2013 18:04:48.630] 
>> [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] No type 
>> converter available to convert from type: 
>> org.apache.camel.converter.stream.InputStreamCache to the required type: 
>> java.io.File with value 
>> org.apache.camel.converter.stream.InputStreamCache@5620f476
>> org.apache.camel.NoTypeConversionAvailableException: No type converter 
>> available to convert from type: 
>> org.apache.camel.converter.stream.InputStreamCache to the required type: 
>> java.io.File with value 
>> org.apache.camel.converter.stream.InputStreamCache@5620f476
>> at 
>> org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:169)
>> at 
>> org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:142)
>> at akka.camel.CamelMessage.getBodyAs(CamelMessage.scala:98)
>> at akka.camel.CamelMessage.bodyAs(CamelMessage.scala:87)
>> at ExampleActor$$anonfun$receive$1.applyOrElse(ExampleActor.scala:13)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I have tried the same, simple code with stream caching disabled, but it 
>> doesn't make any difference.  Here's what I get in that case:
>>
>> [ERROR] [08/28/2013 18:37:56.970] 
>> [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] No type 
>> converter available to convert from type: java.io.BufferedInputStream to 
>> the required type: java.io.File with value 
>> java.io.BufferedInputStream@53b00227
>> org.apache.camel.NoTypeConversionAvailableException: No type converter 
>> available to convert from type: java.io.BufferedInputStream to the required 
>> type: java.io.File with value java.io.BufferedInputStream@53b00227
>> at 
>> org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:169)
>> at 
>> org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:142)
>> at akka.camel.CamelMessage.getBodyAs(CamelMessage.scala:98)
>> at akka.camel.CamelMessage.bodyAs(CamelMessage.scala:87)
>> at ExampleActor$$anonfun$receive$1.applyOrElse(ExampleActor.scala:13)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I guess I am trying to figure out where in the chain the conversion from 
>> File to InputBuffer is occurring (since that's a one-way process)...  When 
>> I set logging to TRACE you can see below that for some reason it's getting 
>> automatically converted before my Actor has a change to process the message:
>>
>> 2013-08-28 18:47:59,764 DEBUG o.a.c.component.file.FileConsumer  - Total 
>> 1 files to consume
>> 2013-08-28 18:47:59,764 TRACE o.a.c.component.file.FileConsumer  - 
>> Processing file: GenericFile[/tmp/inbox/example]
>> 2013-08-28 18:47:59,764 TRACE 
>> o.a.c.c.f.s.MarkerFileExclusiveReadLockStrategy  - Locking the file: 
>> GenericFile[/tmp/inbox/example] using the lock file name: 
>> /tmp/inbox/example.camelLock
>> 2013-08-28 18:47:59,764 TRACE o.a.c.component.file.FileConsumer  - 
>> Retrieving file: /tmp/inbox/example from: 
>> Endpoint[file:///tmp/inbox?noop=true]
>> 2013-08-28 18:47:59,765 TRACE o.a.c.component.file.FileConsumer  - 
>> Retrieved file: /tmp/inbox/example from: 
>> Endpoint[file:///tmp/inbox?noop=true]
>> 2013-08-28 18:47:59,766 DEBUG o.a.c.component.file.FileConsumer  - About 
>> to process file: GenericFile[/tmp/inbox/example] using exchange: 
>> Exchange[example]
>> 2013-08-28 18:47:59,792 TRACE o.a.camel.impl.DefaultUnitOfWork  - 
>> UnitOfWork created for ExchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 with 
>> Exchange[example]
>> 2013-08-28 18:47:59,794 TRACE o.a.camel.impl.DefaultUnitOfWork  - Adding 
>> synchronization GenericFileOnCompletion
>> 2013-08-28 18:47:59,794 TRACE o.a.c.processor.UnitOfWorkProcessor  - 
>> Processing exchange asynchronously: Exchange[example]
>> 2013-08-28 18:47:59,795 TRACE org.apache.camel.processor.Pipeline  - 
>> ExchangeId: ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 
>> should continue routing: true
>> 2013-08-28 18:47:59,795 TRACE org.apache.camel.processor.Pipeline  - 
>> Processing exchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 >>> 
>> Exchange[example]
>> 2013-08-28 18:47:59,822 TRACE o.a.c.i.c.DefaultTypeConverter  - 
>> Converting org.apache.camel.component.file.GenericFile -> 
>> java.io.InputStream with value: GenericFile[/tmp/inbox/example]
>> 2013-08-28 18:47:59,822 TRACE o.a.c.i.c.DefaultTypeConverter  - Using 
>> converter: StaticMethodTypeConverter: public static java.io.InputStream 
>> org.apache.camel.component.file.GenericFileConverter.genericFileToInputStream(org.apache.camel.component.file.GenericFile,org.apache.camel.Exchange)
>>  
>> throws 
>> java.io.IOException,org.apache.camel.NoTypeConversionAvailableException to 
>> convert [class org.apache.camel.component.file.GenericFile=>class 
>> java.io.InputStream]
>> 2013-08-28 18:47:59,822 DEBUG o.a.c.c.file.GenericFileConverter  - Read 
>> file /tmp/inbox/example (no charset)
>> 2013-08-28 18:47:59,836 TRACE o.a.camel.util.AsyncProcessorHelper  - 
>> Exchange processed and is continued routed synchronously for exchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 -> 
>> Exchange[Message: [Body is instance of java.io.InputStream]]
>> 2013-08-28 18:47:59,836 TRACE o.a.camel.util.AsyncProcessorHelper  - 
>> Exchange processed and is continued routed synchronously for exchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 -> 
>> Exchange[Message: [Body is instance of java.io.InputStream]]
>> 2013-08-28 18:47:59,841 TRACE o.a.c.processor.DefaultErrorHandler  - Is 
>> exchangeId: ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 
>> interrupted? false
>> 2013-08-28 18:47:59,841 TRACE o.a.c.processor.DefaultErrorHandler  - Is 
>> exchangeId: ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 
>> done? true
>> 2013-08-28 18:47:59,841 TRACE o.a.camel.util.AsyncProcessorHelper  - 
>> Exchange processed and is continued routed synchronously for exchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 -> 
>> Exchange[Message: [Body is instance of java.io.InputStream]]
>> 2013-08-28 18:47:59,841 TRACE org.apache.camel.processor.Pipeline  - 
>> Processing exchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 is continued 
>> being processed synchronously
>> 2013-08-28 18:47:59,842 TRACE org.apache.camel.processor.Pipeline  - 
>> ExchangeId: ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 
>> should continue routing: true
>> 2013-08-28 18:47:59,842 TRACE org.apache.camel.processor.Pipeline  - 
>> Processing exchangeId: 
>> ID-Ians-MediaMath-MacBook-Pro-local-56093-1377730059172-0-1 >>> 
>> Exchange[Message: [Body is instance of java.io.InputStream]]
>> 2013-08-28 18:47:59,844 DEBUG o.a.camel.processor.SendProcessor  - >>>> 
>> Endpoint[akka://default/user/$a?autoAck=true&replyTimeout=60000+milliseconds]
>>  
>> Exchange[Message: [Body is instance of java.io.InputStream]]
>>
>> I realize this is probably more of a Camel question, I only ask here in 
>> case any has run into this before or because maybe that type converter is 
>> being triggered because of how Akka creates the Camel routes internally?
>>
>> Any tips are appreciated!
>>
>> Cheers,
>>
>> - Ian.
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to