This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch fix/CXF-8096-logging-streaming
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit f7d737836537d21f7e7f607c89c293ae8ac73e75
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Mar 12 13:16:11 2026 +0100

    CXF-8096: Fix LoggingFeature blocking read on streaming responses
    
    WireTapIn was eagerly reading the entire input stream (via
    IOUtils.copyAtLeast) before the application could process it.
    For streaming responses (e.g. StreamingOutput), this blocked the
    client from reading data incrementally.
    
    The fix replaces the eager copy + SequenceInputStream approach with
    a TeeInputStream that copies data to the logging cache as it flows
    through, up to the configured limit. LoggingInInterceptor now defers
    logging to a close callback when the stream hasn't been consumed yet,
    allowing streaming responses to flow through without blocking.
    
    Supersedes #574.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../cxf/ext/logging/LoggingInInterceptor.java      | 23 ++++++
 .../org/apache/cxf/ext/logging/TeeInputStream.java | 90 ++++++++++++++++++++++
 .../java/org/apache/cxf/ext/logging/WireTapIn.java | 17 ++--
 .../logging/AttributeMaskSensitiveHelperTest.java  | 14 ++++
 .../cxf/ext/logging/MaskSensitiveHelperTest.java   | 14 ++++
 .../org/apache/cxf/ext/logging/TransformTest.java  | 13 +++-
 .../org/apache/cxf/ext/logging/TruncatedTest.java  | 61 ++++++++++++++-
 7 files changed, 221 insertions(+), 11 deletions(-)

diff --git 
a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java
 
b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java
index 436bbf3d16..6094ef7cec 100644
--- 
a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java
+++ 
b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java
@@ -90,6 +90,29 @@ public class LoggingInInterceptor extends 
AbstractLoggingInterceptor {
             message.put(LIVE_LOGGING_PROP, Boolean.FALSE);
         }
         createExchangeId(message);
+
+        // Check if a TeeInputStream is present (CXF-8096).
+        // Register a close callback to log when the stream is consumed,
+        // rather than eagerly consuming the stream here which would block
+        // on streaming responses.
+        TeeInputStream tee = message.getContent(TeeInputStream.class);
+        if (tee != null) {
+            CachedOutputStream cos = 
message.getContent(CachedOutputStream.class);
+            if (cos != null && cos.size() > 0) {
+                // Stream was already consumed (e.g. SOAP/JAX-WS where body is 
read
+                // before PRE_INVOKE). Log immediately.
+                logMessage(message);
+            } else {
+                // Stream not consumed yet — defer logging until stream is 
closed.
+                tee.setOnClose(() -> logMessage(message));
+            }
+            return;
+        }
+
+        logMessage(message);
+    }
+
+    private void logMessage(Message message) {
         final LogEvent event = eventMapper.map(message, 
sensitiveProtocolHeaderNames);
         if (shouldLogContent(event)) {
             addContent(message, event);
diff --git 
a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java
 
b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java
new file mode 100644
index 0000000000..3d72a37147
--- /dev/null
+++ 
b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.ext.logging;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.cxf.io.CachedOutputStream;
+
+/**
+ * An InputStream wrapper that copies data to a
+ * CachedOutputStream as it is read, up to a limit.
+ * This avoids eagerly reading the entire stream,
+ * which is important for streaming responses
+ * (CXF-8096).
+ */
+class TeeInputStream extends FilterInputStream {
+    private final CachedOutputStream teeCache;
+    private final int teeLimit;
+    private int count;
+    private Runnable closeCallback;
+
+    TeeInputStream(final InputStream source,
+                   final CachedOutputStream cos,
+                   final int lim) {
+        super(source);
+        this.teeCache = cos;
+        this.teeLimit = lim;
+    }
+
+    void setOnClose(final Runnable callback) {
+        this.closeCallback = callback;
+    }
+
+    @Override
+    public int read() throws IOException {
+        int b = super.read();
+        if (b != -1 && count < teeLimit) {
+            teeCache.write(b);
+            count++;
+        }
+        return b;
+    }
+
+    @Override
+    public int read(final byte[] b,
+                    final int off,
+                    final int len) throws IOException {
+        int n = super.read(b, off, len);
+        if (n > 0 && count < teeLimit) {
+            int toWrite = Math.min(n, teeLimit - count);
+            teeCache.write(b, off, toWrite);
+            count += toWrite;
+        }
+        return n;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            super.close();
+        } finally {
+            teeCache.flush();
+            if (closeCallback != null) {
+                closeCallback.run();
+            }
+        }
+    }
+
+    CachedOutputStream getCachedOutputStream() {
+        return teeCache;
+    }
+}
diff --git 
a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java 
b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java
index 9b08c7d736..a796fe67ad 100644
--- 
a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java
+++ 
b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java
@@ -21,7 +21,6 @@ package org.apache.cxf.ext.logging;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
-import java.io.SequenceInputStream;
 
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.interceptor.Fault;
@@ -86,20 +85,20 @@ public class WireTapIn extends 
AbstractPhaseInterceptor<Message> {
         InputStream bis = is instanceof DelegatingInputStream
             ? ((DelegatingInputStream)is).getInputStream() : is;
 
-        // only copy up to the limit since that's all we need to log
-        // we can stream the rest
-        IOUtils.copyAtLeast(bis, bos, limit == -1 ? Integer.MAX_VALUE : limit);
-        bos.flush();
-        bis = new SequenceInputStream(bos.getInputStream(), bis);
+        // Wrap the stream in a TeeInputStream that copies data to the cache
+        // as it is consumed, up to the configured limit. This avoids blocking
+        // on streaming responses (CXF-8096).
+        int teeLimit = limit == -1 ? Integer.MAX_VALUE : limit;
+        TeeInputStream tee = new TeeInputStream(bis, bos, teeLimit);
 
         // restore the delegating input stream or the input stream
         if (is instanceof DelegatingInputStream) {
-            ((DelegatingInputStream)is).setInputStream(bis);
+            ((DelegatingInputStream)is).setInputStream(tee);
         } else {
-            message.setContent(InputStream.class, bis);
+            message.setContent(InputStream.class, tee);
         }
         message.setContent(CachedOutputStream.class, bos);
-
+        message.setContent(TeeInputStream.class, tee);
     }
 
     public void setLimit(int limit) {
diff --git 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java
 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java
index 925cc9b515..52c219af34 100644
--- 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java
+++ 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java
@@ -133,6 +133,7 @@ public class AttributeMaskSensitiveHelperTest {
             intercept.handleMessage(message);
         }
         inInterceptor.handleMessage(message);
+        consumeAndCloseInputStream(message);
 
         // Verify
         LogEvent event = logEventSender.getLogEvent();
@@ -154,6 +155,7 @@ public class AttributeMaskSensitiveHelperTest {
             intercept.handleMessage(message);
         }
         inInterceptor.handleMessage(message);
+        consumeAndCloseInputStream(message);
 
         // Verify
         LogEvent event = logEventSender.getLogEvent();
@@ -235,6 +237,18 @@ public class AttributeMaskSensitiveHelperTest {
         return message;
     }
 
+    private static void consumeAndCloseInputStream(Message message) {
+        try {
+            InputStream is = message.getContent(InputStream.class);
+            if (is != null) {
+                is.transferTo(OutputStream.nullOutputStream());
+                is.close();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private Message prepareOutMessage() {
         Message message = new MessageImpl();
         message.put(Message.CONTENT_TYPE, contentType);
diff --git 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java
 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java
index 761409b3a6..17d700d875 100644
--- 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java
+++ 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java
@@ -156,6 +156,7 @@ public class MaskSensitiveHelperTest {
             intercept.handleMessage(message);
         }
         inInterceptor.handleMessage(message);
+        consumeAndCloseInputStream(message);
 
         // Verify
         LogEvent event = logEventSender.getLogEvent();
@@ -177,6 +178,7 @@ public class MaskSensitiveHelperTest {
             intercept.handleMessage(message);
         }
         inInterceptor.handleMessage(message);
+        consumeAndCloseInputStream(message);
 
         // Verify
         LogEvent event = logEventSender.getLogEvent();
@@ -256,6 +258,18 @@ public class MaskSensitiveHelperTest {
         return message;
     }
 
+    private static void consumeAndCloseInputStream(Message message) {
+        try {
+            InputStream is = message.getContent(InputStream.class);
+            if (is != null) {
+                is.transferTo(OutputStream.nullOutputStream());
+                is.close();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private Message prepareOutMessage() {
         Message message = new MessageImpl();
         message.put(Message.CONTENT_TYPE, contentType);
diff --git 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java
 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java
index 48d53b0274..4da27d0ade 100644
--- 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java
+++ 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java
@@ -136,10 +136,21 @@ public class TransformTest {
         }
         interceptor.handleMessage(message);
 
+        // Consume and close the stream to trigger deferred logging (CXF-8096)
+        try {
+            InputStream is = message.getContent(InputStream.class);
+            if (is != null) {
+                is.transferTo(OutputStream.nullOutputStream());
+                is.close();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
         // Verify
         LogEvent event = logEventSender.getLogEvent();
         assertNotNull(event);
-        assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload()); // only 
the first byte is read!
+        assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload());
     }
 
     @Test
diff --git 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java
 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java
index a7567fc15f..18d12b64e3 100644
--- 
a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java
+++ 
b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java
@@ -23,12 +23,17 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.cxf.ext.logging.event.LogEvent;
 import org.apache.cxf.message.Exchange;
@@ -41,6 +46,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 
@@ -112,6 +118,13 @@ public class TruncatedTest {
 
         interceptor.handleMessage(message);
 
+        // Consume and close the stream to trigger deferred logging (CXF-8096)
+        InputStream is = message.getContent(InputStream.class);
+        if (is != null) {
+            is.transferTo(OutputStream.nullOutputStream());
+            is.close();
+        }
+
         LogEvent event = logEventSender.getLogEvent();
         assertNotNull(event);
         assertEquals("T", event.getPayload()); // only the first byte is read!
@@ -143,7 +156,53 @@ public class TruncatedTest {
         assertEquals("T", event.getPayload()); // only the first byte is read!
         assertTrue(event.isTruncated());
     }
-    
 
+    /**
+     * CXF-8096: Verify that LoggingFeature does not block when reading from a 
streaming
+     * input source. The TeeInputStream approach allows the application to 
read data
+     * incrementally while logging is deferred until the stream is closed.
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void streamingInputShouldNotBlockRead() throws Exception {
+        // Use PipedInputStream to simulate a slow streaming response
+        PipedOutputStream producer = new PipedOutputStream();
+        PipedInputStream slowStream = new PipedInputStream(producer);
+
+        Message message = new MessageImpl();
+        message.setContent(InputStream.class, slowStream);
+        Exchange exchange = new ExchangeImpl();
+        message.setExchange(exchange);
+        LogEventSenderMock logEventSender = new LogEventSenderMock();
+        LoggingInInterceptor interceptor = new 
LoggingInInterceptor(logEventSender);
+
+        // Run WireTapIn
+        Collection<PhaseInterceptor<? extends Message>> interceptors = 
interceptor.getAdditionalInterceptors();
+        for (PhaseInterceptor intercept : interceptors) {
+            intercept.handleMessage(message);
+        }
+
+        // Run LoggingInInterceptor — should NOT block waiting for stream data
+        interceptor.handleMessage(message);
+
+        // At this point, logging should be deferred — no event yet
+        assertNull("Logging should be deferred for streaming input", 
logEventSender.getLogEvent());
+
+        // Write some data and close the stream
+        producer.write("Hello streaming!".getBytes(StandardCharsets.UTF_8));
+        producer.close();
+
+        // Now consume the stream from the application side
+        InputStream is = message.getContent(InputStream.class);
+        byte[] data = is.readAllBytes();
+        is.close();
+
+        assertEquals("Hello streaming!", new String(data, 
StandardCharsets.UTF_8));
+
+        // After stream close, deferred logging should have fired
+        LogEvent event = logEventSender.getLogEvent();
+        assertNotNull("Log event should be available after stream is closed", 
event);
+        assertEquals("Hello streaming!", event.getPayload());
+    }
 
 }

Reply via email to