[ 
https://issues.apache.org/jira/browse/CAMEL-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13634858#comment-13634858
 ] 

Ulrich Kramer edited comment on CAMEL-6294 at 4/18/13 9:33 AM:
---------------------------------------------------------------

h3. Multiple convertions

If you call msg.getBody in your Processor twice you don't know if the exchange 
was created using a StreamCache. To be sure, you always have to call 
MessageHelper.resetStreamCache before you call Message.getBody.


Wouldn't it be easier to call MessageHelper.resetStreamCache during the 
convertion of StreamCache to something else?

Additionally there should be no automatic convertion from StreamCache to 
InputStream (FileInputStreamCache extends InputStream). I think it would be 
better to extend the StreamCache interface with a method returning an 
InputStream. This method could be used to convert StreamCache instances to 
InputStreams. Today two concurrent readers get the same InputStream object.

If you are using normal Apache Camel routes, the StreamCachingInterceptor is 
calling StreamCache.reset between two steps and everthing is fine. But if you 
write components using other components (e.g. implement a protocol based on 
HTTP using the jetty component) there is no interceptor inserted between. So 
you have always take care when you call Message.getBody.

h3. Closing InputStream

I think it should be possible to call FileInputStreamCache.close without 
getting an exception on a the following convertion. This could be also achieved 
by calling MessageHelper.resetStreamCache during each convertion or by setting 
stream to null in FileInputStreamCache.close.

h3. Closing CachedOutputStream

Many source code analyzers want a stream to be closed before leaving the 
method. I think it should be easy to hand over the ownership of the temporary 
file from CachedOutputStream to FileInputStreamCache during the call to 
getStreamCache

h3. Delay Endpoint

I implemented a DelayEndpoint showing the problems with stream caching. This 
endpoint immediately acknowledges incomming messages and delays them for a 
short time before forwarding them to the next processor.

Stream caching is not used explicitly. The jetty component uses stream caching 
under the hood. 

If I send small messages (less than 64 kB) everthing works fine. For messages 
larger than 64 kB sometimes a "Stream close" exception is thrown and the 
received message is not complete. 

The reason is that the jetty component (for the HTTP response) and the 
processor at the end of the route are reading concurrently from 
FileInputStreamCache.

{code}
package com.sap.camel.util;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;

import junit.framework.Assert;

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

class DelayProducer extends DefaultProducer implements AsyncProcessor {
        private Timer timer = new Timer();
        private Processor delegate;
        
        public DelayProducer(Endpoint endpoint, Processor delegate) {
                super(endpoint);
                this.delegate = delegate;
        }
        @Override
        public void process(Exchange exchange) throws Exception {
                AsyncProcessorHelper.process(this, exchange);
        }

        @Override
        public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
                final Exchange copy = 
ExchangeHelper.createCorrelatedCopy(exchange, true);
                timer.schedule(new TimerTask() {

                        @Override
                        public void run() {
                                try {
                                        delegate.process(copy);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }}, 0);
                try {
                        Thread.sleep(10);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                callback.done(true);
                return true;
        }
        
}

class DelayEndpoint extends DefaultEndpoint {

        private AsyncProcessor consumer = null;
        public DelayEndpoint(String uri, CamelContext context) {
                super(uri,context);
        }
        @Override
        public Producer createProducer() throws Exception {
                return new DelayProducer(this,consumer);
        }

        @Override
        public Consumer createConsumer(Processor processor) throws Exception {
                consumer = AsyncProcessorConverterHelper.convert(processor);
                return new DefaultConsumer(this,processor);
        }

        @Override
        public boolean isSingleton() {
                return true;
        }
        
}

public class StreamCacheBugs {
        
        private CamelContext context;
        
        @BeforeClass
        public void setUp() {
                
                context = new DefaultCamelContext();
        }

        @Test
        public void delay() throws Exception {
        
        final Endpoint timer = new DelayEndpoint("xxx", context);
        final StringBuffer actual = new StringBuffer();
        final Semaphore sema = new Semaphore(0);
        context.addRoutes( new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(timer).process(new Processor() {
                                        @Override
                                        public void process(Exchange exchange) 
throws Exception {
                                                
actual.append(exchange.getIn().getBody(String.class));
                                                sema.release();
                                        }});
                
from("jetty:http://localhost:4444/streamcache";).setHeader("dummy",constant("xxx")).to(timer);
            }});
        context.start();
        ProducerTemplate p = context.createProducerTemplate();
                StringBuffer sb = new StringBuffer();
                for ( int i = 0 ; i < 10000; i++) sb.append("0123456789");
                for ( int i = 0 ; i < 10;i++) {
                        actual.delete(0, actual.length());
                        
p.sendBody("jetty:http://localhost:4444/streamcache",sb.toString());
                        sema.acquire();
                        Assert.assertEquals(actual.toString(),sb.toString());
                }
        context.stop();
        }

}
{code}
                
      was (Author: ulrich.kramer):
    h3. Multiple convertions

If you call msg.getBody in your Processor twice you don't know if the exchange 
was created using a StreamCache. To be sure, you always have to call 
MessageHelper.resetStreamCache before you call Message.getBody. 

Wouldn't it be easier to call MessageHelper.resetStreamCache during the 
convertion of StreamCache to something else?

Additionally there should be no automatic convertion from StreamCache to 
InputStream (FileInputStreamCache extends InputStream). I think it would be 
better to extend the StreamCache interface with a method returning an 
InputStream. This method could be used to convert StreamCache instances to 
InputStreams. Today two concurrent readers get the same InputStream object.

h3. Closing InputStream

I think it should be possible to call FileInputStreamCache.close without 
getting an exception on a the following convertion. This could be also achieved 
by calling MessageHelper.resetStreamCache during each convertion or by setting 
stream to null in FileInputStreamCache.close.

h3. Closing CachedOutputStream

Many source code analyzers want a stream to be closed before leaving the 
method. I think it should be easy to hand over the ownership of the temporary 
file from CachedOutputStream to FileInputStreamCache during the call to 
getStreamCache

h3. Delay Endpoint

I implemented a DelayEndpoint showing the problems with stream caching. This 
endpoint immediately acknowledges incomming messages and delays them for a 
short time before forwarding them to the next processor.

Stream caching is not used explicitly. The jetty component uses stream caching 
under the hood. 

If I send small messages (less than 64 kB) everthing works fine. For messages 
larger than 64 kB sometimes a "Stream close" exception is thrown and the 
received message is not complete. 

The reason is that the jetty component (for the HTTP response) and the 
processor at the end of the route are reading concurrently from 
FileInputStreamCache.

{code}
package com.sap.camel.util;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;

import junit.framework.Assert;

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

class DelayProducer extends DefaultProducer implements AsyncProcessor {
        private Timer timer = new Timer();
        private Processor delegate;
        
        public DelayProducer(Endpoint endpoint, Processor delegate) {
                super(endpoint);
                this.delegate = delegate;
        }
        @Override
        public void process(Exchange exchange) throws Exception {
                AsyncProcessorHelper.process(this, exchange);
        }

        @Override
        public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
                final Exchange copy = 
ExchangeHelper.createCorrelatedCopy(exchange, true);
                timer.schedule(new TimerTask() {

                        @Override
                        public void run() {
                                try {
                                        delegate.process(copy);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }}, 0);
                try {
                        Thread.sleep(10);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                callback.done(true);
                return true;
        }
        
}

class DelayEndpoint extends DefaultEndpoint {

        private AsyncProcessor consumer = null;
        public DelayEndpoint(String uri, CamelContext context) {
                super(uri,context);
        }
        @Override
        public Producer createProducer() throws Exception {
                return new DelayProducer(this,consumer);
        }

        @Override
        public Consumer createConsumer(Processor processor) throws Exception {
                consumer = AsyncProcessorConverterHelper.convert(processor);
                return new DefaultConsumer(this,processor);
        }

        @Override
        public boolean isSingleton() {
                return true;
        }
        
}

public class StreamCacheBugs {
        
        private CamelContext context;
        
        @BeforeClass
        public void setUp() {
                
                context = new DefaultCamelContext();
        }

        @Test
        public void delay() throws Exception {
        
        final Endpoint timer = new DelayEndpoint("xxx", context);
        final StringBuffer actual = new StringBuffer();
        final Semaphore sema = new Semaphore(0);
        context.addRoutes( new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(timer).process(new Processor() {
                                        @Override
                                        public void process(Exchange exchange) 
throws Exception {
                                                
actual.append(exchange.getIn().getBody(String.class));
                                                sema.release();
                                        }});
                
from("jetty:http://localhost:4444/streamcache";).setHeader("dummy",constant("xxx")).to(timer);
            }});
        context.start();
        ProducerTemplate p = context.createProducerTemplate();
                StringBuffer sb = new StringBuffer();
                for ( int i = 0 ; i < 10000; i++) sb.append("0123456789");
                for ( int i = 0 ; i < 10;i++) {
                        actual.delete(0, actual.length());
                        
p.sendBody("jetty:http://localhost:4444/streamcache",sb.toString());
                        sema.acquire();
                        Assert.assertEquals(actual.toString(),sb.toString());
                }
        context.stop();
        }

}
{code}
                  
> StreamCache doesn't work as expected
> ------------------------------------
>
>                 Key: CAMEL-6294
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6294
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.10.4
>         Environment: Debian 6.0
>            Reporter: Ulrich Kramer
>            Assignee: Willem Jiang
>             Fix For: 2.10.5
>
>
> The following Unittests fail:
> {code}
> package com.sap.camel.util;
> import java.io.InputStream;
> import junit.framework.Assert;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.converter.stream.CachedOutputStream;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.impl.DefaultExchange;
> import org.testng.annotations.Test;
> public class StreamCacheBugs {
>       
>       private CamelContext context;
>       public void setUp() {
>               
>               context = new DefaultCamelContext();
>       }
>       @Test
>       public void multipleConvertionsYielsToEmptyBody() throws Exception {
>               Exchange exchange = new DefaultExchange(context);
>               Message msg = exchange.getIn();
>               CachedOutputStream out = new CachedOutputStream(exchange);
>               out.write("Hello World".getBytes());
>               msg.setBody(out.getStreamCache());
>               Assert.assertEquals(msg.getBody(String.class), "Hello World");
>               Assert.assertEquals(msg.getBody(String.class), "Hello World");
>       }
>       @Test
>       public void closingInputStreamYieldsToException() throws Exception {
>               Exchange exchange = new DefaultExchange(context);
>               Message msg = exchange.getIn();
>               CachedOutputStream out = new CachedOutputStream(exchange);
>               for ( int i = 0 ; i < 10000; i++) 
> out.write("0123456789".getBytes());
>               msg.setBody(out.getStreamCache());
>               InputStream in = msg.getBody(InputStream.class);
>               in.read();
>               in.close();
>               msg.getBody(String.class);
>       }
>       
>       @Test
>       public void cachedOutputStreamsShouldBeClosable() throws Exception {
>               Exchange exchange = new DefaultExchange(context);
>               Message msg = exchange.getIn();
>               CachedOutputStream out = new CachedOutputStream(exchange);
>               for ( int i = 0 ; i < 10000; i++) 
> out.write("0123456789".getBytes());
>               msg.setBody(out.getStreamCache());
>               out.close();
>               msg.getBody(String.class);
>       }
> }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to