CAMEL-7101 Add aggregation strategy to aggregate multiple messages into a zip file with thanks to Pontus
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ea96a15 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ea96a15 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ea96a15 Branch: refs/heads/master Commit: 4ea96a15434920069c6aba39d6bafed42a998276 Parents: d3e7083 Author: Willem Jiang <[email protected]> Authored: Mon Dec 30 10:37:17 2013 +0800 Committer: Willem Jiang <[email protected]> Committed: Mon Dec 30 10:37:17 2013 +0800 ---------------------------------------------------------------------- .../zipfile/ZipAggregationStrategy.java | 222 +++++++++++++++++++ .../zipfile/ZipAggregationStrategyTest.java | 79 +++++++ .../camel/aggregate/zipfile/data/chiau.txt | 1 + .../apache/camel/aggregate/zipfile/data/hi.txt | 1 + .../camel/aggregate/zipfile/data/hola.txt | 1 + 5 files changed, 304 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java new file mode 100644 index 0000000..a028614 --- /dev/null +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.component.file.FileConsumer; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileMessage; +import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.util.FileUtil; + +/** + * This aggregation strategy will aggregate all incoming messages into a ZIP file. + * <p>If the incoming exchanges contain {@link GenericFileMessage} file name will + * be taken from the body otherwise the body content will be treated as a byte + * array and the ZIP entry will be named using the message id.</p> + * <p><b>Note:</b> Please note that this aggregation strategy requires eager + * completion check to work properly.</p> + * + */ +public class ZipAggregationStrategy implements AggregationStrategy { + + private String filePrefix; + private String fileSuffix = ".zip"; + + /** + * Gets the prefix used when creating the ZIP file name. + * @return the prefix + */ + public String getFilePrefix() { + return filePrefix; + } + + /** + * Sets the prefix that will be used when creating the ZIP filename. + * @param filePrefix prefix to use on ZIP file. + */ + public void setFilePrefix(String filePrefix) { + this.filePrefix = filePrefix; + } + + /** + * Gets the suffix used when creating the ZIP file name. + * @return the suffix + */ + public String getFileSuffix() { + return fileSuffix; + } + /** + * Sets the suffix that will be used when creating the ZIP filename. + * @param fileSuffix suffix to use on ZIP file. + */ + public void setFileSuffix(String fileSuffix) { + this.fileSuffix = fileSuffix; + } + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + File zipFile; + Exchange answer = oldExchange; + + // Guard against empty new exchanges + if (newExchange == null) { + return oldExchange; + } + + // First time for this aggregation + if (oldExchange == null) { + try { + zipFile = FileUtil.createTempFile(this.filePrefix, this.fileSuffix); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + DefaultEndpoint endpoint = (DefaultEndpoint) newExchange.getFromEndpoint(); + answer = endpoint.createExchange(); + answer.addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); + } else { + zipFile = oldExchange.getIn().getBody(File.class); + } + + // Handle GenericFileMessages + if (GenericFileMessage.class.isAssignableFrom(newExchange.getIn().getClass())) { + try { + File appendFile = newExchange.getIn().getBody(File.class); + if (appendFile != null) { + addFilesToZip(zipFile, new File[]{appendFile}); + GenericFile<File> genericFile = + FileConsumer.asGenericFile( + zipFile.getParent(), + zipFile, + Charset.defaultCharset().toString()); + genericFile.bindToExchange(answer); + } else { + throw new GenericFileOperationFailedException("Could not get body as file."); + } + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + } else { + // Handle all other messages + byte[] buffer = newExchange.getIn().getBody(byte[].class); + try { + addEntryToZip(zipFile, newExchange.getIn().getMessageId(), buffer, buffer.length); + GenericFile<File> genericFile = FileConsumer.asGenericFile( + zipFile.getParent(), zipFile, Charset.defaultCharset().toString()); + genericFile.bindToExchange(answer); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + } + + return answer; + } + + private static void addFilesToZip(File source, File[] files) throws IOException { + File tmpZip = File.createTempFile(source.getName(), null); + tmpZip.delete(); + if (!source.renameTo(tmpZip)) { + throw new IOException("Could not make temp file (" + source.getName() + ")"); + } + byte[] buffer = new byte[1024]; + ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); + + for (int i = 0; i < files.length; i++) { + InputStream in = new FileInputStream(files[i]); + out.putNextEntry(new ZipEntry(files[i].getName())); + for (int read = in.read(buffer); read > -1; read = in.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + in.close(); + } + + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + out.putNextEntry(ze); + for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + } + zin.close(); + out.close(); + tmpZip.delete(); + } + + private static void addEntryToZip(File source, String entryName, byte[] buffer, int length) throws IOException { + + File tmpZip = File.createTempFile(source.getName(), null); + tmpZip.delete(); + if (!source.renameTo(tmpZip)) { + throw new IOException("Could not make temp file (" + source.getName() + ")"); + } + ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); + + out.putNextEntry(new ZipEntry(entryName)); + out.write(buffer, 0, length); + out.closeEntry(); + + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + out.putNextEntry(ze); + for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + } + zin.close(); + out.close(); + tmpZip.delete(); + } + + /** + * This callback class is used to clean up the temporary ZIP file once the exchange has completed. + * + */ + private class DeleteZipFileOnCompletion implements Synchronization { + + private File fileToDelete; + + public DeleteZipFileOnCompletion(File fileToDelete) { + this.fileToDelete = fileToDelete; + } + + @Override + public void onFailure(Exchange exchange) { + // Keep the file if somthing gone a miss. + } + + @Override + public void onComplete(Exchange exchange) { + FileUtil.deleteFile(this.fileToDelete); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java new file mode 100644 index 0000000..be473fe --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + + +import java.io.File; +import java.io.FileInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.file.GenericFileMessage; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ZipAggregationStrategyTest extends CamelTestSupport { + + private static final int EXPECTED_NO_FILES = 3; + + @Test + public void testSplitter() throws Exception { + MockEndpoint aggregateToZipEntry = getMockEndpoint("mock:aggregateToZipEntry"); + aggregateToZipEntry.expectedMessageCount(1); + assertMockEndpointsSatisfied(); + + Exchange out = aggregateToZipEntry.getExchanges().get(0); + assertTrue("Result message does not contain GenericFileMessage", GenericFileMessage.class.isAssignableFrom(out.getIn().getClass())); + File resultFile = out.getIn().getBody(File.class); + assertNotNull(resultFile); + assertTrue("Zip file should exist", resultFile.isFile()); + assertTrue("Result file name does not end with .zip", resultFile.getName().endsWith(".zip")); + + ZipInputStream zin = new ZipInputStream(new FileInputStream(resultFile)); + try { + int fileCount = 0; + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + fileCount++; + } + assertTrue("Zip file should contains " + ZipAggregationStrategyTest.EXPECTED_NO_FILES + " files", + fileCount == ZipAggregationStrategyTest.EXPECTED_NO_FILES); + } finally { + zin.close(); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // Unzip file and Split it according to FileEntry + from("file:src/test/resources/org/apache/camel/aggregate/zipfile/data?consumer.delay=1000&noop=true") + .aggregate(new ZipAggregationStrategy()) + .constant(true) + .completionFromBatchConsumer() + .eagerCheckCompletion() + .to("mock:aggregateToZipEntry") + .log("Done processing big file: ${header.CamelFileName}"); + } + }; + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt new file mode 100644 index 0000000..7842486 --- /dev/null +++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt @@ -0,0 +1 @@ +chau \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt new file mode 100644 index 0000000..32f95c0 --- /dev/null +++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt @@ -0,0 +1 @@ +hi \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt new file mode 100644 index 0000000..b8b4a4e --- /dev/null +++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt @@ -0,0 +1 @@ +hola \ No newline at end of file
