[ 
https://issues.apache.org/jira/browse/CAMEL-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen reassigned CAMEL-16953:
-----------------------------------

    Assignee: Claus Ibsen

> camel-zip-deflater - Use Commons Compress to be able to un-zip large payloads
> -----------------------------------------------------------------------------
>
>                 Key: CAMEL-16953
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16953
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 3.11.0
>         Environment: Ubuntu 19.04
> openjdk 11.0.11 2021-04-20
> OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
> OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, 
> sharing)
> Camel 3.11.1
>            Reporter: Roman Vottner
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 3.12.0
>
>
> ZulipChat-Discussion: 
> https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling
> In a very simple route setup that just reads any tar.gz archives found in the 
> provided directory and prints the names of the files within that archive to 
> the log, this code fails on processing larger tar.gz archives.
>  
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> from(file("archiveFile"))
>  .routeId("pre-processing")
>  .process(exchange -> {
>  LOG.info("Processing archive: {}", 
> exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
>  })
>  .unmarshal().gzipDeflater()
>  .split(new TarSplitter()).streaming()
>  .process(exchange -> {
>  final String name = exchange.getIn().getHeader(Exchange.FILE_NAME, 
> String.class); 
>  LOG.debug("name: {}", name);
>  })
>  .end();
> {code}
> The JVM quickly runs out of memory as it probably copies over the bytes from 
> the original stream to a stream that should take care of decompressing the 
> files, as indicated by the exception being thrown in the `hugeCapacity(...)` 
> method of the `ByteArrayOutputStream`class:
>  
> {code:title=StackTrace|borderStyle=solid}
> org.apache.camel.CamelExecutionException: Exception occurred during execution 
> on the exchange: Exchange[]
>  at 
> org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
>  at 
> org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)
>  at 
> org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)
>  at 
> org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)
>  at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
>  at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
>  at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
>  at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
>  at 
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
>  at 
> org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
>  at 
> org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
>  at 
> org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
>  at 
> org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
>  at 
> org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: null
>  at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>  at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>  at 
> org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)
>  at 
> org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)
>  at 
> org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)
>  ... 18 common frames omitted
> {code}
> If I instead use a custom DataFormat class that looks like this:
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> import org.apache.camel.Exchange;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.util.IOHelper;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
> import 
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import java.io.InputStream;
> import java.io.OutputStream;
> public class GZipDataFormat implements DataFormat {
> @Override
>  public void marshal(Exchange exchange, Object graph, OutputStream stream) 
> throws Exception {
>  InputStream is = 
> exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class,
>  exchange, graph);
> GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream);
>  try {
>  IOHelper.copy(is, zipOutput);
>  } finally {
>  // must close all input streams
>  IOHelper.close(is, zipOutput);
>  }
>  }
> @Override
>  public Object unmarshal(Exchange exchange, InputStream stream) throws 
> Exception {
>  return new 
> GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class));
>  }
> @Override
>  public void start() {
> }
> @Override
>  public void stop() {
> }
> }
> {code}
> and change the `unmarshal().gzipDeflater()` to `unmarshal(new 
> GZipDataFormat())` implementation, Camel is able to decompress the bytes 
> correctly and pass the stream on so that the `TarSplitter` can iterate over 
> the entries of that archive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to