This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new b7b4f0aad08 Tar aggregation strategy null bodies (#15732)
b7b4f0aad08 is described below
commit b7b4f0aad085a71165f1c5b014989d89542a654d
Author: thomas-gantenbein-tga
<[email protected]>
AuthorDate: Fri Sep 27 07:41:48 2024 +0200
Tar aggregation strategy null bodies (#15732)
* Handle null bodies in TarAggregationStrategy
* Reduce cognitive complexity
* Remove deprecated doPreSetup/doPostSetup methods and some cosmetic edits
---------
Co-authored-by: Thomas Gantenbein <[email protected]>
---
components/camel-tarfile/pom.xml | 6 +
.../aggregate/tarfile/TarAggregationStrategy.java | 62 +++++-----
.../tarfile/SpringTarSplitterRouteTest.java | 4 +-
.../tarfile/TarFileSplitIteratorCorruptTest.java | 4 +-
.../TarAggregationStrategyNullBodyTest.java | 130 +++++++++++++++++++++
5 files changed, 175 insertions(+), 31 deletions(-)
diff --git a/components/camel-tarfile/pom.xml b/components/camel-tarfile/pom.xml
index 867d03057d9..2482969c29c 100644
--- a/components/camel-tarfile/pom.xml
+++ b/components/camel-tarfile/pom.xml
@@ -66,5 +66,11 @@
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility-version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
index 512f7911966..5efe3c53e99 100644
---
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
+++
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
@@ -137,13 +137,13 @@ public class TarAggregationStrategy implements
AggregationStrategy {
File tarFile;
Exchange answer = oldExchange;
+ boolean isFirstTimeInAggregation = oldExchange == null;
// Guard against empty new exchanges
- if (newExchange == null) {
+ if (newExchange.getIn().getBody() == null &&
!isFirstTimeInAggregation) {
return oldExchange;
}
- // First time for this aggregation
- if (oldExchange == null) {
+ if (isFirstTimeInAggregation) {
try {
tarFile = FileUtil.createTempFile(this.filePrefix,
this.fileSuffix, this.parentDir);
LOG.trace("Created temporary file: {}", tarFile);
@@ -157,25 +157,23 @@ public class TarAggregationStrategy implements
AggregationStrategy {
}
Object body = newExchange.getIn().getBody();
- if (body instanceof WrappedFile) {
- body = ((WrappedFile) body).getFile();
+ if (body instanceof WrappedFile wrappedFile) {
+ body = wrappedFile.getFile();
}
- if (body instanceof File) {
- try {
- File appendFile = (File) body;
- // do not try to append empty files
- if (appendFile.length() > 0) {
- String entryName = preserveFolderStructure
- ?
newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class)
- : newExchange.getIn().getMessageId();
- addFileToTar(tarFile, appendFile,
this.preserveFolderStructure ? entryName : null);
- }
- } catch (Exception e) {
- throw new GenericFileOperationFailedException(e.getMessage(),
e);
- }
+ if (body instanceof File appendFile) {
+ addFileToTar(newExchange, appendFile, tarFile);
} else {
- // Handle all other messages
+ appendIncomingBodyAsBytesToTar(newExchange, tarFile);
+ }
+ GenericFile<File> genericFile = FileConsumer.asGenericFile(
+ tarFile.getParent(), tarFile,
Charset.defaultCharset().toString(), false);
+ genericFile.bindToExchange(answer);
+ return answer;
+ }
+
+ private void appendIncomingBodyAsBytesToTar(Exchange newExchange, File
tarFile) {
+ if (newExchange.getIn().getBody() != null) {
try {
byte[] buffer =
newExchange.getIn().getMandatoryBody(byte[].class);
// do not try to append empty data
@@ -189,10 +187,20 @@ public class TarAggregationStrategy implements
AggregationStrategy {
throw new GenericFileOperationFailedException(e.getMessage(),
e);
}
}
- GenericFile<File> genericFile = FileConsumer.asGenericFile(
- tarFile.getParent(), tarFile,
Charset.defaultCharset().toString(), false);
- genericFile.bindToExchange(answer);
- return answer;
+ }
+
+ private void addFileToTar(Exchange newExchange, File appendFile, File
tarFile) {
+ try {
+ // do not try to append empty files
+ if (appendFile.length() > 0) {
+ String entryName = preserveFolderStructure
+ ? newExchange.getIn().getHeader(Exchange.FILE_NAME,
String.class)
+ : newExchange.getIn().getMessageId();
+ addFileToTar(tarFile, appendFile, this.preserveFolderStructure
? entryName : null);
+ }
+ } catch (Exception e) {
+ throw new GenericFileOperationFailedException(e.getMessage(), e);
+ }
}
@Override
@@ -205,13 +213,13 @@ public class TarAggregationStrategy implements
AggregationStrategy {
private void addFileToTar(File source, File file, String fileName) throws
IOException, ArchiveException {
File tmpTar = Files.createTempFile(parentDir.toPath(),
source.getName(), null).toFile();
- tmpTar.delete();
+ Files.delete(tmpTar.toPath());
if (!source.renameTo(tmpTar)) {
throw new IOException("Could not make temp file (" +
source.getName() + ")");
}
try (FileInputStream fis = new FileInputStream(tmpTar)) {
- try (TarArchiveInputStream tin = (TarArchiveInputStream) new
ArchiveStreamFactory()
+ try (TarArchiveInputStream tin = new ArchiveStreamFactory()
.createArchiveInputStream(ArchiveStreamFactory.TAR, fis)) {
try (TarArchiveOutputStream tos = new
TarArchiveOutputStream(new FileOutputStream(source))) {
tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
@@ -250,13 +258,13 @@ public class TarAggregationStrategy implements
AggregationStrategy {
private void addEntryToTar(File source, String entryName, byte[] buffer,
int length) throws IOException, ArchiveException {
File tmpTar = Files.createTempFile(parentDir.toPath(),
source.getName(), null).toFile();
- tmpTar.delete();
+ Files.delete(tmpTar.toPath());
if (!source.renameTo(tmpTar)) {
throw new IOException("Cannot create temp file: " +
source.getName());
}
try (FileInputStream fis = new FileInputStream(tmpTar)) {
- try (TarArchiveInputStream tin = (TarArchiveInputStream) new
ArchiveStreamFactory()
+ try (TarArchiveInputStream tin = new ArchiveStreamFactory()
.createArchiveInputStream(ArchiveStreamFactory.TAR, fis)) {
try (TarArchiveOutputStream tos = new
TarArchiveOutputStream(new FileOutputStream(source))) {
tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
diff --git
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
index 0b7831a2fde..d7b32b21f0c 100644
---
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
+++
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
@@ -21,10 +21,10 @@ import
org.apache.camel.test.spring.junit5.CamelSpringTestSupport;
import org.junit.jupiter.api.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
-public class SpringTarSplitterRouteTest extends CamelSpringTestSupport {
+class SpringTarSplitterRouteTest extends CamelSpringTestSupport {
@Test
- public void testSplitter() throws InterruptedException {
+ void testSplitter() throws InterruptedException {
MockEndpoint processTarEntry = getMockEndpoint("mock:processTarEntry");
processTarEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola",
"hello", "greetings");
diff --git
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
index ad7ae26b7b9..f68c6543d33 100644
---
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
+++
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
@@ -25,10 +25,10 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
-public class TarFileSplitIteratorCorruptTest extends CamelTestSupport {
+class TarFileSplitIteratorCorruptTest extends CamelTestSupport {
@Test
- public void testTarFileUnmarshal() throws Exception {
+ void testTarFileUnmarshal() throws Exception {
getMockEndpoint("mock:dead").expectedMessageCount(1);
getMockEndpoint("mock:dead").message(0).exchangeProperty(Exchange.EXCEPTION_CAUGHT)
.isInstanceOf(IllegalStateException.class);
diff --git
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyNullBodyTest.java
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyNullBodyTest.java
new file mode 100644
index 00000000000..babf48ea3a4
--- /dev/null
+++
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyNullBodyTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.tarfile;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.camel.test.junit5.TestSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.logging.log4j.core.util.IOUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TarAggregationStrategyNullBodyTest extends CamelTestSupport {
+
+ @BeforeEach
+ public void cleanOutputDir() {
+ TestSupport.deleteDirectory("target/out");
+ }
+
+ @Test
+ void testNullBodyLast() throws Exception {
+ template.sendBody("direct:start", "Hello");
+ template.sendBody("direct:start", "Hello again");
+ template.sendBody("direct:start", null);
+ assertTarFileContains(2);
+ }
+
+ @Test
+ void testNullBodyFirst() throws Exception {
+ template.sendBody("direct:start", null);
+ template.sendBody("direct:start", "Hello");
+ template.sendBody("direct:start", "Hello again");
+ assertTarFileContains(2);
+ }
+
+ @Test
+ void testNullBodyMiddle() throws Exception {
+ template.sendBody("direct:start", "Hello");
+ template.sendBody("direct:start", null);
+ template.sendBody("direct:start", "Hello again");
+ assertTarFileContains(2);
+ }
+
+ @Test
+ void testNullBodiesOnly() throws Exception {
+ template.sendBody("direct:start", null);
+ template.sendBody("direct:start", null);
+ template.sendBody("direct:start", null);
+ assertTarFileContains(0);
+ }
+
+ @Test
+ void testTwoNullBodies() throws Exception {
+ template.sendBody("direct:start", null);
+ template.sendBody("direct:start", null);
+ template.sendBody("direct:start", "Hello");
+ assertTarFileContains(1);
+ }
+
+ public void assertTarFileContains(int filesInTarExpected) throws Exception
{
+ await("Should be a file in target/out directory").until(() -> {
+ File[] files = new File("target/out").listFiles();
+ return files != null && files.length > 0;
+ });
+ File[] files = new File("target/out").listFiles();
+ assertEquals(1, files.length, "Should only be one file in target/out
directory");
+ Map<String, String> tar = readTar(files[0]);
+ assertEquals(filesInTarExpected, tar.size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // @formatter:off
+ from("direct:start")
+ .aggregate(new TarAggregationStrategy(false))
+ .constant(true)
+ .completionSize(3)
+ .eagerCheckCompletion()
+ .to("file:target/out")
+ .to("mock:aggregateToTarEntry");
+ // @formatter:on
+ }
+ };
+ }
+
+ private static Map<String, String> readTar(File file) throws IOException {
+ Map<String, String> content = new TreeMap<>();
+ TarArchiveInputStream tin = new TarArchiveInputStream(new
FileInputStream(file));
+ try {
+ for (TarArchiveEntry te = tin.getNextEntry();
+ te != null;
+ te = tin.getNextEntry()) {
+ String c = IOUtils.toString(new InputStreamReader(tin));
+ content.put(te.getName(), c);
+ }
+ } finally {
+ IOHelper.close(tin);
+ }
+ return content;
+ }
+}