This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch fix/CXF-8096-logging-streaming in repository https://gitbox.apache.org/repos/asf/cxf.git
commit f7d737836537d21f7e7f607c89c293ae8ac73e75 Author: Guillaume Nodet <[email protected]> AuthorDate: Thu Mar 12 13:16:11 2026 +0100 CXF-8096: Fix LoggingFeature blocking read on streaming responses WireTapIn was eagerly reading the entire input stream (via IOUtils.copyAtLeast) before the application could process it. For streaming responses (e.g. StreamingOutput), this blocked the client from reading data incrementally. The fix replaces the eager copy + SequenceInputStream approach with a TeeInputStream that copies data to the logging cache as it flows through, up to the configured limit. LoggingInInterceptor now defers logging to a close callback when the stream hasn't been consumed yet, allowing streaming responses to flow through without blocking. Supersedes #574. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../cxf/ext/logging/LoggingInInterceptor.java | 23 ++++++ .../org/apache/cxf/ext/logging/TeeInputStream.java | 90 ++++++++++++++++++++++ .../java/org/apache/cxf/ext/logging/WireTapIn.java | 17 ++-- .../logging/AttributeMaskSensitiveHelperTest.java | 14 ++++ .../cxf/ext/logging/MaskSensitiveHelperTest.java | 14 ++++ .../org/apache/cxf/ext/logging/TransformTest.java | 13 +++- .../org/apache/cxf/ext/logging/TruncatedTest.java | 61 ++++++++++++++- 7 files changed, 221 insertions(+), 11 deletions(-) diff --git a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java index 436bbf3d16..6094ef7cec 100644 --- a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java +++ b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java @@ -90,6 +90,29 @@ public class LoggingInInterceptor extends AbstractLoggingInterceptor { message.put(LIVE_LOGGING_PROP, Boolean.FALSE); } createExchangeId(message); + + // Check if a TeeInputStream is present (CXF-8096). + // Register a close callback to log when the stream is consumed, + // rather than eagerly consuming the stream here which would block + // on streaming responses. + TeeInputStream tee = message.getContent(TeeInputStream.class); + if (tee != null) { + CachedOutputStream cos = message.getContent(CachedOutputStream.class); + if (cos != null && cos.size() > 0) { + // Stream was already consumed (e.g. SOAP/JAX-WS where body is read + // before PRE_INVOKE). Log immediately. + logMessage(message); + } else { + // Stream not consumed yet — defer logging until stream is closed. + tee.setOnClose(() -> logMessage(message)); + } + return; + } + + logMessage(message); + } + + private void logMessage(Message message) { final LogEvent event = eventMapper.map(message, sensitiveProtocolHeaderNames); if (shouldLogContent(event)) { addContent(message, event); diff --git a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java new file mode 100644 index 0000000000..3d72a37147 --- /dev/null +++ b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.ext.logging; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cxf.io.CachedOutputStream; + +/** + * An InputStream wrapper that copies data to a + * CachedOutputStream as it is read, up to a limit. + * This avoids eagerly reading the entire stream, + * which is important for streaming responses + * (CXF-8096). + */ +class TeeInputStream extends FilterInputStream { + private final CachedOutputStream teeCache; + private final int teeLimit; + private int count; + private Runnable closeCallback; + + TeeInputStream(final InputStream source, + final CachedOutputStream cos, + final int lim) { + super(source); + this.teeCache = cos; + this.teeLimit = lim; + } + + void setOnClose(final Runnable callback) { + this.closeCallback = callback; + } + + @Override + public int read() throws IOException { + int b = super.read(); + if (b != -1 && count < teeLimit) { + teeCache.write(b); + count++; + } + return b; + } + + @Override + public int read(final byte[] b, + final int off, + final int len) throws IOException { + int n = super.read(b, off, len); + if (n > 0 && count < teeLimit) { + int toWrite = Math.min(n, teeLimit - count); + teeCache.write(b, off, toWrite); + count += toWrite; + } + return n; + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + teeCache.flush(); + if (closeCallback != null) { + closeCallback.run(); + } + } + } + + CachedOutputStream getCachedOutputStream() { + return teeCache; + } +} diff --git a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java index 9b08c7d736..a796fe67ad 100644 --- a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java +++ b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java @@ -21,7 +21,6 @@ package org.apache.cxf.ext.logging; import java.io.IOException; import java.io.InputStream; import java.io.Reader; -import java.io.SequenceInputStream; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.interceptor.Fault; @@ -86,20 +85,20 @@ public class WireTapIn extends AbstractPhaseInterceptor<Message> { InputStream bis = is instanceof DelegatingInputStream ? ((DelegatingInputStream)is).getInputStream() : is; - // only copy up to the limit since that's all we need to log - // we can stream the rest - IOUtils.copyAtLeast(bis, bos, limit == -1 ? Integer.MAX_VALUE : limit); - bos.flush(); - bis = new SequenceInputStream(bos.getInputStream(), bis); + // Wrap the stream in a TeeInputStream that copies data to the cache + // as it is consumed, up to the configured limit. This avoids blocking + // on streaming responses (CXF-8096). + int teeLimit = limit == -1 ? Integer.MAX_VALUE : limit; + TeeInputStream tee = new TeeInputStream(bis, bos, teeLimit); // restore the delegating input stream or the input stream if (is instanceof DelegatingInputStream) { - ((DelegatingInputStream)is).setInputStream(bis); + ((DelegatingInputStream)is).setInputStream(tee); } else { - message.setContent(InputStream.class, bis); + message.setContent(InputStream.class, tee); } message.setContent(CachedOutputStream.class, bos); - + message.setContent(TeeInputStream.class, tee); } public void setLimit(int limit) { diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java index 925cc9b515..52c219af34 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java @@ -133,6 +133,7 @@ public class AttributeMaskSensitiveHelperTest { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -154,6 +155,7 @@ public class AttributeMaskSensitiveHelperTest { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -235,6 +237,18 @@ public class AttributeMaskSensitiveHelperTest { return message; } + private static void consumeAndCloseInputStream(Message message) { + try { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private Message prepareOutMessage() { Message message = new MessageImpl(); message.put(Message.CONTENT_TYPE, contentType); diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java index 761409b3a6..17d700d875 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java @@ -156,6 +156,7 @@ public class MaskSensitiveHelperTest { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -177,6 +178,7 @@ public class MaskSensitiveHelperTest { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -256,6 +258,18 @@ public class MaskSensitiveHelperTest { return message; } + private static void consumeAndCloseInputStream(Message message) { + try { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private Message prepareOutMessage() { Message message = new MessageImpl(); message.put(Message.CONTENT_TYPE, contentType); diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java index 48d53b0274..4da27d0ade 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java @@ -136,10 +136,21 @@ public class TransformTest { } interceptor.handleMessage(message); + // Consume and close the stream to trigger deferred logging (CXF-8096) + try { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + // Verify LogEvent event = logEventSender.getLogEvent(); assertNotNull(event); - assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload()); // only the first byte is read! + assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload()); } @Test diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java index a7567fc15f..18d12b64e3 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java @@ -23,12 +23,17 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.cxf.ext.logging.event.LogEvent; import org.apache.cxf.message.Exchange; @@ -41,6 +46,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -112,6 +118,13 @@ public class TruncatedTest { interceptor.handleMessage(message); + // Consume and close the stream to trigger deferred logging (CXF-8096) + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + LogEvent event = logEventSender.getLogEvent(); assertNotNull(event); assertEquals("T", event.getPayload()); // only the first byte is read! @@ -143,7 +156,53 @@ public class TruncatedTest { assertEquals("T", event.getPayload()); // only the first byte is read! assertTrue(event.isTruncated()); } - + /** + * CXF-8096: Verify that LoggingFeature does not block when reading from a streaming + * input source. The TeeInputStream approach allows the application to read data + * incrementally while logging is deferred until the stream is closed. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void streamingInputShouldNotBlockRead() throws Exception { + // Use PipedInputStream to simulate a slow streaming response + PipedOutputStream producer = new PipedOutputStream(); + PipedInputStream slowStream = new PipedInputStream(producer); + + Message message = new MessageImpl(); + message.setContent(InputStream.class, slowStream); + Exchange exchange = new ExchangeImpl(); + message.setExchange(exchange); + LogEventSenderMock logEventSender = new LogEventSenderMock(); + LoggingInInterceptor interceptor = new LoggingInInterceptor(logEventSender); + + // Run WireTapIn + Collection<PhaseInterceptor<? extends Message>> interceptors = interceptor.getAdditionalInterceptors(); + for (PhaseInterceptor intercept : interceptors) { + intercept.handleMessage(message); + } + + // Run LoggingInInterceptor — should NOT block waiting for stream data + interceptor.handleMessage(message); + + // At this point, logging should be deferred — no event yet + assertNull("Logging should be deferred for streaming input", logEventSender.getLogEvent()); + + // Write some data and close the stream + producer.write("Hello streaming!".getBytes(StandardCharsets.UTF_8)); + producer.close(); + + // Now consume the stream from the application side + InputStream is = message.getContent(InputStream.class); + byte[] data = is.readAllBytes(); + is.close(); + + assertEquals("Hello streaming!", new String(data, StandardCharsets.UTF_8)); + + // After stream close, deferred logging should have fired + LogEvent event = logEventSender.getLogEvent(); + assertNotNull("Log event should be available after stream is closed", event); + assertEquals("Hello streaming!", event.getPayload()); + } }
