[
https://issues.apache.org/jira/browse/CAMEL-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen reassigned CAMEL-16953:
-----------------------------------
Assignee: Claus Ibsen
> camel-zip-deflater - Use Commons Compress to be able to un-zip large payloads
> -----------------------------------------------------------------------------
>
> Key: CAMEL-16953
> URL: https://issues.apache.org/jira/browse/CAMEL-16953
> Project: Camel
> Issue Type: Improvement
> Affects Versions: 3.11.0
> Environment: Ubuntu 19.04
> openjdk 11.0.11 2021-04-20
> OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
> OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode,
> sharing)
> Camel 3.11.1
> Reporter: Roman Vottner
> Assignee: Claus Ibsen
> Priority: Major
> Fix For: 3.12.0
>
>
> ZulipChat-Discussion:
> https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling
> In a very simple route setup that just reads any tar.gz archives found in the
> provided directory and prints the names of the files within that archive to
> the log, this code fails on processing larger tar.gz archives.
>
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> from(file("archiveFile"))
> .routeId("pre-processing")
> .process(exchange -> {
> LOG.info("Processing archive: {}",
> exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
> })
> .unmarshal().gzipDeflater()
> .split(new TarSplitter()).streaming()
> .process(exchange -> {
> final String name = exchange.getIn().getHeader(Exchange.FILE_NAME,
> String.class);
> LOG.debug("name: {}", name);
> })
> .end();
> {code}
> The JVM quickly runs out of memory as it probably copies over the bytes from
> the original stream to a stream that should take care of decompressing the
> files, as indicated by the exception being thrown in the `hugeCapacity(...)`
> method of the `ByteArrayOutputStream`class:
>
> {code:title=StackTrace|borderStyle=solid}
> org.apache.camel.CamelExecutionException: Exception occurred during execution
> on the exchange: Exchange[]
> at
> org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
> at
> org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)
> at
> org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)
> at
> org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)
> at
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
> at
> org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
> at
> org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
> at
> org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
> at
> org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
> at
> org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: null
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at
> org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)
> at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)
> at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)
> at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)
> at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)
> at
> org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)
> at
> org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)
> ... 18 common frames omitted
> {code}
> If I instead use a custom DataFormat class that looks like this:
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> import org.apache.camel.Exchange;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.util.IOHelper;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
> import
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import java.io.InputStream;
> import java.io.OutputStream;
> public class GZipDataFormat implements DataFormat {
> @Override
> public void marshal(Exchange exchange, Object graph, OutputStream stream)
> throws Exception {
> InputStream is =
> exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class,
> exchange, graph);
> GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream);
> try {
> IOHelper.copy(is, zipOutput);
> } finally {
> // must close all input streams
> IOHelper.close(is, zipOutput);
> }
> }
> @Override
> public Object unmarshal(Exchange exchange, InputStream stream) throws
> Exception {
> return new
> GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class));
> }
> @Override
> public void start() {
> }
> @Override
> public void stop() {
> }
> }
> {code}
> and change the `unmarshal().gzipDeflater()` to `unmarshal(new
> GZipDataFormat())` implementation, Camel is able to decompress the bytes
> correctly and pass the stream on so that the `TarSplitter` can iterate over
> the entries of that archive.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)