This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.4.x by this push:
     new b7b4f0aad08 Tar aggregation strategy null bodies (#15732)
b7b4f0aad08 is described below

commit b7b4f0aad085a71165f1c5b014989d89542a654d
Author: thomas-gantenbein-tga 
<[email protected]>
AuthorDate: Fri Sep 27 07:41:48 2024 +0200

    Tar aggregation strategy null bodies (#15732)
    
    * Handle null bodies in TarAggregationStrategy
    
    * Reduce cognitive complexity
    
    * Remove deprecated doPreSetup/doPostSetup methods and some cosmetic edits
    
    ---------
    
    Co-authored-by: Thomas Gantenbein <[email protected]>
---
 components/camel-tarfile/pom.xml                   |   6 +
 .../aggregate/tarfile/TarAggregationStrategy.java  |  62 +++++-----
 .../tarfile/SpringTarSplitterRouteTest.java        |   4 +-
 .../tarfile/TarFileSplitIteratorCorruptTest.java   |   4 +-
 .../TarAggregationStrategyNullBodyTest.java        | 130 +++++++++++++++++++++
 5 files changed, 175 insertions(+), 31 deletions(-)

diff --git a/components/camel-tarfile/pom.xml b/components/camel-tarfile/pom.xml
index 867d03057d9..2482969c29c 100644
--- a/components/camel-tarfile/pom.xml
+++ b/components/camel-tarfile/pom.xml
@@ -66,5 +66,11 @@
             <artifactId>log4j-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility-version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
 
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
index 512f7911966..5efe3c53e99 100644
--- 
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
+++ 
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
@@ -137,13 +137,13 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         File tarFile;
         Exchange answer = oldExchange;
 
+        boolean isFirstTimeInAggregation = oldExchange == null;
         // Guard against empty new exchanges
-        if (newExchange == null) {
+        if (newExchange.getIn().getBody() == null && 
!isFirstTimeInAggregation) {
             return oldExchange;
         }
 
-        // First time for this aggregation
-        if (oldExchange == null) {
+        if (isFirstTimeInAggregation) {
             try {
                 tarFile = FileUtil.createTempFile(this.filePrefix, 
this.fileSuffix, this.parentDir);
                 LOG.trace("Created temporary file: {}", tarFile);
@@ -157,25 +157,23 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         }
 
         Object body = newExchange.getIn().getBody();
-        if (body instanceof WrappedFile) {
-            body = ((WrappedFile) body).getFile();
+        if (body instanceof WrappedFile wrappedFile) {
+            body = wrappedFile.getFile();
         }
 
-        if (body instanceof File) {
-            try {
-                File appendFile = (File) body;
-                // do not try to append empty files
-                if (appendFile.length() > 0) {
-                    String entryName = preserveFolderStructure
-                            ? 
newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class)
-                            : newExchange.getIn().getMessageId();
-                    addFileToTar(tarFile, appendFile, 
this.preserveFolderStructure ? entryName : null);
-                }
-            } catch (Exception e) {
-                throw new GenericFileOperationFailedException(e.getMessage(), 
e);
-            }
+        if (body instanceof File appendFile) {
+            addFileToTar(newExchange, appendFile, tarFile);
         } else {
-            // Handle all other messages
+            appendIncomingBodyAsBytesToTar(newExchange, tarFile);
+        }
+        GenericFile<File> genericFile = FileConsumer.asGenericFile(
+                tarFile.getParent(), tarFile, 
Charset.defaultCharset().toString(), false);
+        genericFile.bindToExchange(answer);
+        return answer;
+    }
+
+    private void appendIncomingBodyAsBytesToTar(Exchange newExchange, File 
tarFile) {
+        if (newExchange.getIn().getBody() != null) {
             try {
                 byte[] buffer = 
newExchange.getIn().getMandatoryBody(byte[].class);
                 // do not try to append empty data
@@ -189,10 +187,20 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
                 throw new GenericFileOperationFailedException(e.getMessage(), 
e);
             }
         }
-        GenericFile<File> genericFile = FileConsumer.asGenericFile(
-                tarFile.getParent(), tarFile, 
Charset.defaultCharset().toString(), false);
-        genericFile.bindToExchange(answer);
-        return answer;
+    }
+
+    private void addFileToTar(Exchange newExchange, File appendFile, File 
tarFile) {
+        try {
+            // do not try to append empty files
+            if (appendFile.length() > 0) {
+                String entryName = preserveFolderStructure
+                        ? newExchange.getIn().getHeader(Exchange.FILE_NAME, 
String.class)
+                        : newExchange.getIn().getMessageId();
+                addFileToTar(tarFile, appendFile, this.preserveFolderStructure 
? entryName : null);
+            }
+        } catch (Exception e) {
+            throw new GenericFileOperationFailedException(e.getMessage(), e);
+        }
     }
 
     @Override
@@ -205,13 +213,13 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
 
     private void addFileToTar(File source, File file, String fileName) throws 
IOException, ArchiveException {
         File tmpTar = Files.createTempFile(parentDir.toPath(), 
source.getName(), null).toFile();
-        tmpTar.delete();
+        Files.delete(tmpTar.toPath());
         if (!source.renameTo(tmpTar)) {
             throw new IOException("Could not make temp file (" + 
source.getName() + ")");
         }
 
         try (FileInputStream fis = new FileInputStream(tmpTar)) {
-            try (TarArchiveInputStream tin = (TarArchiveInputStream) new 
ArchiveStreamFactory()
+            try (TarArchiveInputStream tin = new ArchiveStreamFactory()
                     .createArchiveInputStream(ArchiveStreamFactory.TAR, fis)) {
                 try (TarArchiveOutputStream tos = new 
TarArchiveOutputStream(new FileOutputStream(source))) {
                     tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
@@ -250,13 +258,13 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
 
     private void addEntryToTar(File source, String entryName, byte[] buffer, 
int length) throws IOException, ArchiveException {
         File tmpTar = Files.createTempFile(parentDir.toPath(), 
source.getName(), null).toFile();
-        tmpTar.delete();
+        Files.delete(tmpTar.toPath());
         if (!source.renameTo(tmpTar)) {
             throw new IOException("Cannot create temp file: " + 
source.getName());
         }
 
         try (FileInputStream fis = new FileInputStream(tmpTar)) {
-            try (TarArchiveInputStream tin = (TarArchiveInputStream) new 
ArchiveStreamFactory()
+            try (TarArchiveInputStream tin = new ArchiveStreamFactory()
                     .createArchiveInputStream(ArchiveStreamFactory.TAR, fis)) {
                 try (TarArchiveOutputStream tos = new 
TarArchiveOutputStream(new FileOutputStream(source))) {
                     tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
diff --git 
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
 
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
index 0b7831a2fde..d7b32b21f0c 100644
--- 
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
+++ 
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/SpringTarSplitterRouteTest.java
@@ -21,10 +21,10 @@ import 
org.apache.camel.test.spring.junit5.CamelSpringTestSupport;
 import org.junit.jupiter.api.Test;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
-public class SpringTarSplitterRouteTest extends CamelSpringTestSupport {
+class SpringTarSplitterRouteTest extends CamelSpringTestSupport {
 
     @Test
-    public void testSplitter() throws InterruptedException {
+    void testSplitter() throws InterruptedException {
         MockEndpoint processTarEntry = getMockEndpoint("mock:processTarEntry");
 
         processTarEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola", 
"hello", "greetings");
diff --git 
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
 
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
index ad7ae26b7b9..f68c6543d33 100644
--- 
a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
+++ 
b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarFileSplitIteratorCorruptTest.java
@@ -25,10 +25,10 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
 
-public class TarFileSplitIteratorCorruptTest extends CamelTestSupport {
+class TarFileSplitIteratorCorruptTest extends CamelTestSupport {
 
     @Test
-    public void testTarFileUnmarshal() throws Exception {
+    void testTarFileUnmarshal() throws Exception {
         getMockEndpoint("mock:dead").expectedMessageCount(1);
         
getMockEndpoint("mock:dead").message(0).exchangeProperty(Exchange.EXCEPTION_CAUGHT)
                 .isInstanceOf(IllegalStateException.class);
diff --git 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyNullBodyTest.java
 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyNullBodyTest.java
new file mode 100644
index 00000000000..babf48ea3a4
--- /dev/null
+++ 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyNullBodyTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.tarfile;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.camel.test.junit5.TestSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.logging.log4j.core.util.IOUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TarAggregationStrategyNullBodyTest extends CamelTestSupport {
+
+    @BeforeEach
+    public void cleanOutputDir() {
+        TestSupport.deleteDirectory("target/out");
+    }
+
+    @Test
+    void testNullBodyLast() throws Exception {
+        template.sendBody("direct:start", "Hello");
+        template.sendBody("direct:start", "Hello again");
+        template.sendBody("direct:start", null);
+        assertTarFileContains(2);
+    }
+
+    @Test
+    void testNullBodyFirst() throws Exception {
+        template.sendBody("direct:start", null);
+        template.sendBody("direct:start", "Hello");
+        template.sendBody("direct:start", "Hello again");
+        assertTarFileContains(2);
+    }
+
+    @Test
+    void testNullBodyMiddle() throws Exception {
+        template.sendBody("direct:start", "Hello");
+        template.sendBody("direct:start", null);
+        template.sendBody("direct:start", "Hello again");
+        assertTarFileContains(2);
+    }
+
+    @Test
+    void testNullBodiesOnly() throws Exception {
+        template.sendBody("direct:start", null);
+        template.sendBody("direct:start", null);
+        template.sendBody("direct:start", null);
+        assertTarFileContains(0);
+    }
+
+    @Test
+    void testTwoNullBodies() throws Exception {
+        template.sendBody("direct:start", null);
+        template.sendBody("direct:start", null);
+        template.sendBody("direct:start", "Hello");
+        assertTarFileContains(1);
+    }
+
+    public void assertTarFileContains(int filesInTarExpected) throws Exception 
{
+        await("Should be a file in target/out directory").until(() -> {
+            File[] files = new File("target/out").listFiles();
+            return files != null && files.length > 0;
+        });
+        File[] files = new File("target/out").listFiles();
+        assertEquals(1, files.length, "Should only be one file in target/out 
directory");
+        Map<String, String> tar = readTar(files[0]);
+        assertEquals(filesInTarExpected, tar.size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // @formatter:off
+                from("direct:start")
+                        .aggregate(new TarAggregationStrategy(false))
+                        .constant(true)
+                        .completionSize(3)
+                        .eagerCheckCompletion()
+                        .to("file:target/out")
+                        .to("mock:aggregateToTarEntry");
+                // @formatter:on
+            }
+        };
+    }
+
+    private static Map<String, String> readTar(File file) throws IOException {
+        Map<String, String> content = new TreeMap<>();
+        TarArchiveInputStream tin = new TarArchiveInputStream(new 
FileInputStream(file));
+        try {
+            for (TarArchiveEntry te = tin.getNextEntry();
+                 te != null;
+                 te = tin.getNextEntry()) {
+                String c = IOUtils.toString(new InputStreamReader(tin));
+                content.put(te.getName(), c);
+            }
+        } finally {
+            IOHelper.close(tin);
+        }
+        return content;
+    }
+}

Reply via email to