This is an automated email from the ASF dual-hosted git repository.

ffang pushed a commit to branch 4.1.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/4.1.x-fixes by this push:
     new dd13bb24dc5 [CXF-9129]Chunked attachment streaming not working when 
using ws-security (#3153)
dd13bb24dc5 is described below

commit dd13bb24dc5785961409828378570312483aaacb
Author: Freeman(Yue) Fang <[email protected]>
AuthorDate: Fri May 29 10:50:21 2026 -0400

    [CXF-9129]Chunked attachment streaming not working when using ws-security 
(#3153)
    
    (cherry picked from commit 2e26e6c976d4d49309cef7f942286abd3368adff)
---
 .../cxf/binding/soap/saaj/SAAJInInterceptor.java   |   2 +-
 .../security/wss4j/AbstractWSS4JInterceptor.java   |  24 +++
 .../systest/ws/mtom/MTOMLargeStreamingImpl.java    | 113 ++++++++++
 .../cxf/systest/ws/mtom/MTOMStreamingImpl.java     | 105 +++++++++
 .../systest/ws/mtom/MTOMStreamingSecurityTest.java | 239 +++++++++++++++++++++
 .../src/test/resources/DoubleItLogical.wsdl        |  28 +++
 .../apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl   |  29 ++-
 .../org/apache/cxf/systest/ws/mtom/client.xml      |  25 ++-
 .../org/apache/cxf/systest/ws/mtom/server.xml      |  76 ++++++-
 .../cxf/systest/ws/mtom/streaming-client.xml       |  81 +++++++
 10 files changed, 718 insertions(+), 4 deletions(-)

diff --git 
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
 
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
index 22101c60c04..759f5a3e927 100644
--- 
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
+++ 
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
@@ -205,7 +205,7 @@ public class SAAJInInterceptor extends 
AbstractSoapInterceptor {
             message.setContent(Node.class, soapMessage.getSOAPPart());
 
             Collection<Attachment> atts = message.getAttachments();
-            if (atts != null) {
+            if (atts != null && 
!"false".equals(message.get("expandXOPInclude"))) {
                 for (Attachment a : atts) {
                     if (a.getDataHandler().getDataSource() instanceof 
AttachmentDataSource) {
                         try {
diff --git 
a/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
 
b/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
index e66212bf892..4205553f389 100644
--- 
a/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
+++ 
b/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
@@ -207,6 +207,30 @@ public abstract class AbstractWSS4JInterceptor extends 
WSHandler implements Soap
         if (passwordEncryptor != null) {
             msg.put(ConfigurationConstants.PASSWORD_ENCRYPTOR_INSTANCE, 
passwordEncryptor);
         }
+
+        Object expandXOP = 
SecurityUtils.getSecurityPropertyValue(SecurityConstants.EXPAND_XOP_INCLUDE, 
msg);
+        if (expandXOP != null) {
+            msg.put(ConfigurationConstants.EXPAND_XOP_INCLUDE,
+                    Boolean.parseBoolean(expandXOP.toString()) ? "true" : 
"false");
+        } else {
+            // Also propagate the WSS4J-level key from the options map so that 
SAAJInInterceptor
+            // can skip attachment caching when expandXOPInclude=false is 
configured directly.
+            String expandXOPOption = (String) 
getOption(ConfigurationConstants.EXPAND_XOP_INCLUDE);
+            if (expandXOPOption != null) {
+                msg.put(ConfigurationConstants.EXPAND_XOP_INCLUDE, 
expandXOPOption);
+            }
+        }
+        Object storeBytes =
+            
SecurityUtils.getSecurityPropertyValue(SecurityConstants.STORE_BYTES_IN_ATTACHMENT,
 msg);
+        if (storeBytes != null) {
+            msg.put(ConfigurationConstants.STORE_BYTES_IN_ATTACHMENT,
+                    Boolean.parseBoolean(storeBytes.toString()) ? "true" : 
"false");
+        } else {
+            String storeBytesOption = (String) 
getOption(ConfigurationConstants.STORE_BYTES_IN_ATTACHMENT);
+            if (storeBytesOption != null) {
+                msg.put(ConfigurationConstants.STORE_BYTES_IN_ATTACHMENT, 
storeBytesOption);
+            }
+        }
     }
 
     @Override
diff --git 
a/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMLargeStreamingImpl.java
 
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMLargeStreamingImpl.java
new file mode 100644
index 00000000000..31800029cd4
--- /dev/null
+++ 
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMLargeStreamingImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.mtom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jakarta.activation.DataHandler;
+import jakarta.activation.DataSource;
+import jakarta.jws.WebService;
+import org.example.contract.doubleit.DoubleItFault;
+import org.example.contract.doubleit.DoubleItStreamingMtomPortType;
+
+/**
+ * Large-payload MTOM streaming implementation used by 
testStreamingMTOMTimingReproducer.
+ *
+ * The 64 × 1 KB = 64 KB total payload exceeds Jetty's default 32 KB output 
buffer.
+ * Jetty auto-flushes to the client when its buffer fills, so the CXF client 
receives
+ * the complete SOAP body and obtains a lazy DataHandler before the server 
finishes
+ * writing the remaining attachment chunks.
+ */
+@WebService(targetNamespace = "http://www.example.org/contract/DoubleIt";,
+            serviceName = "DoubleItService",
+            endpointInterface = 
"org.example.contract.doubleit.DoubleItStreamingMtomPortType")
+public class MTOMLargeStreamingImpl implements DoubleItStreamingMtomPortType {
+
+    static final long CHUNK_DELAY_MS = 100L;
+    static final int CHUNK_COUNT = 64;
+    static final int CHUNK_SIZE_BYTES = 1024;
+
+    static final AtomicReference<Instant> STREAMING_FINISHED = new 
AtomicReference<>();
+
+    public static void resetStreamingFinished() {
+        STREAMING_FINISHED.set(null);
+    }
+
+    public static Instant getStreamingFinished() {
+        return STREAMING_FINISHED.get();
+    }
+
+    @Override
+    public DataHandler doubleIt5(int numberToDouble) throws DoubleItFault {
+        if (numberToDouble == 0) {
+            throw new DoubleItFault("0 can't be doubled!");
+        }
+        try {
+            PipedInputStream pipedIn = new PipedInputStream(CHUNK_SIZE_BYTES * 
2);
+            PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);
+            Thread writerThread = new Thread(() -> {
+                byte[] chunk = new byte[CHUNK_SIZE_BYTES];
+                Arrays.fill(chunk, (byte) 'A');
+                try {
+                    for (int i = 0; i < CHUNK_COUNT; i++) {
+                        pipedOut.write(chunk);
+                        pipedOut.flush();
+                        Thread.sleep(CHUNK_DELAY_MS);
+                    }
+                    pipedOut.close();
+                    STREAMING_FINISHED.set(Instant.now());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e) {
+                    // pipe may be closed early by consumer; no further action 
needed
+                }
+            });
+            writerThread.setDaemon(true);
+            writerThread.start();
+            DataSource dataSource = new DataSource() {
+                @Override
+                public InputStream getInputStream() {
+                    return pipedIn;
+                }
+                @Override
+                public OutputStream getOutputStream() {
+                    throw new UnsupportedOperationException();
+                }
+                @Override
+                public String getContentType() {
+                    return "application/octet-stream";
+                }
+                @Override
+                public String getName() {
+                    return "streaming-data-large";
+                }
+            };
+            return new DataHandler(dataSource);
+        } catch (IOException e) {
+            throw new DoubleItFault("Error creating streaming response: " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingImpl.java
 
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingImpl.java
new file mode 100644
index 00000000000..d4a8cf1e4ad
--- /dev/null
+++ 
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingImpl.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.mtom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jakarta.activation.DataHandler;
+import jakarta.activation.DataSource;
+import jakarta.jws.WebService;
+import org.example.contract.doubleit.DoubleItFault;
+import org.example.contract.doubleit.DoubleItStreamingMtomPortType;
+
+@WebService(targetNamespace = "http://www.example.org/contract/DoubleIt";,
+            serviceName = "DoubleItService",
+            endpointInterface = 
"org.example.contract.doubleit.DoubleItStreamingMtomPortType")
+public class MTOMStreamingImpl implements DoubleItStreamingMtomPortType {
+
+    static final long CHUNK_DELAY_MS = 500L;
+    static final int CHUNK_COUNT = 5;
+    static final int CHUNK_SIZE_BYTES = 1024;
+
+    static final AtomicReference<Instant> STREAMING_FINISHED = new 
AtomicReference<>();
+
+    public static void resetStreamingFinished() {
+        STREAMING_FINISHED.set(null);
+    }
+
+    public static Instant getStreamingFinished() {
+        return STREAMING_FINISHED.get();
+    }
+
+    @Override
+    public DataHandler doubleIt5(int numberToDouble) throws DoubleItFault {
+        if (numberToDouble == 0) {
+            throw new DoubleItFault("0 can't be doubled!");
+        }
+        try {
+            PipedInputStream pipedIn = new PipedInputStream(CHUNK_SIZE_BYTES * 
2);
+            PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);
+            Thread writerThread = new Thread(() -> {
+                byte[] chunk = new byte[CHUNK_SIZE_BYTES];
+                Arrays.fill(chunk, (byte) 'A');
+                try {
+                    for (int i = 0; i < CHUNK_COUNT; i++) {
+                        pipedOut.write(chunk);
+                        pipedOut.flush();
+                        Thread.sleep(CHUNK_DELAY_MS);
+                    }
+                    pipedOut.close();
+                    STREAMING_FINISHED.set(Instant.now());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e) {
+                    // pipe may be closed early by consumer; no further action 
needed
+                }
+            });
+            writerThread.setDaemon(true);
+            writerThread.start();
+            DataSource dataSource = new DataSource() {
+                @Override
+                public InputStream getInputStream() {
+                    return pipedIn;
+                }
+                @Override
+                public OutputStream getOutputStream() {
+                    throw new UnsupportedOperationException();
+                }
+                @Override
+                public String getContentType() {
+                    return "application/octet-stream";
+                }
+                @Override
+                public String getName() {
+                    return "streaming-data";
+                }
+            };
+            return new DataHandler(dataSource);
+        } catch (IOException e) {
+            throw new DoubleItFault("Error creating streaming response: " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingSecurityTest.java
 
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingSecurityTest.java
new file mode 100644
index 00000000000..54648ee45cb
--- /dev/null
+++ 
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingSecurityTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.mtom;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.time.Instant;
+
+import javax.xml.namespace.QName;
+
+import jakarta.activation.DataHandler;
+import jakarta.xml.ws.Service;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.example.contract.doubleit.DoubleItStreamingMtomPortType;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for CXF-9129: MTOM attachment streaming with WS-Security.
+ *
+ * The root cause is that WSS4JOutInterceptor defaults to 
expandXopInclude=true and
+ * storeBytesInAttachment=true when MTOM is enabled. With these defaults WSS4J 
reads every
+ * attachment byte during the POST_PROTOCOL security phase and processes 
attachments via
+ * SwA-style references, which prevents streaming DataHandlers from working 
correctly
+ * (the attachment stream is consumed before AttachmentOutEndingInterceptor 
can write it).
+ *
+ * The fix is to set expandXopInclude=false and storeBytesInAttachment=false on
+ * WSS4JOutInterceptor. WS-Security then signs only the SOAP body (which 
contains the
+ * xop:Include reference), leaving the raw attachment bytes to flow through
+ * AttachmentOutEndingInterceptor at PRE_STREAM_ENDING without being consumed 
early.
+ */
+public class MTOMStreamingSecurityTest extends AbstractBusClientServerTestBase 
{
+
+    public static final String PORT = allocatePort(MTOMServer.class);
+
+    private static final String NAMESPACE = 
"http://www.example.org/contract/DoubleIt";;
+    private static final QName SERVICE_QNAME = new QName(NAMESPACE, 
"DoubleItService");
+
+    @Before
+    public void resetStreamingTracker() {
+        MTOMStreamingImpl.resetStreamingFinished();
+        MTOMLargeStreamingImpl.resetStreamingFinished();
+    }
+
+    @BeforeClass
+    public static void startServers() throws Exception {
+        assertTrue(
+            "Server failed to launch",
+            launchServer(MTOMServer.class, true)
+        );
+    }
+
+    @org.junit.AfterClass
+    public static void cleanup() throws Exception {
+        stopAllServers();
+    }
+
+    /**
+     * Baseline: without WS-Security, the MTOM endpoint returns all streamed 
bytes correctly.
+     */
+    @org.junit.Test
+    public void testStreamingMTOMNoSecurity() throws Exception {
+        SpringBusFactory bf = new SpringBusFactory();
+        URL busFile = 
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+        Bus bus = bf.createBus(busFile.toString());
+        BusFactory.setDefaultBus(bus);
+        BusFactory.setThreadDefaultBus(bus);
+
+        URL wsdl = 
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+        Service service = Service.create(wsdl, SERVICE_QNAME);
+        QName portQName = new QName(NAMESPACE, "DoubleItStreamingMtomPort");
+        DoubleItStreamingMtomPortType port =
+            service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+        updateAddressPort(port, PORT);
+
+        DataHandler response = port.doubleIt5(25);
+
+        int received = drain(response);
+        assertEquals("Should receive all streamed bytes",
+            MTOMStreamingImpl.CHUNK_COUNT * 
MTOMStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+        Instant serverFinished = MTOMStreamingImpl.getStreamingFinished();
+        assertNotNull("Streaming should have completed", serverFinished);
+
+        ((java.io.Closeable) port).close();
+        bus.shutdown(true);
+    }
+
+    /**
+     * Tests CXF-9129 fix: with expandXopInclude=false and 
storeBytesInAttachment=false, the
+     * signed MTOM endpoint returns all streamed bytes correctly without 
errors.
+     */
+    @org.junit.Test
+    public void testStreamingMTOMWithExpandXopFix() throws Exception {
+        SpringBusFactory bf = new SpringBusFactory();
+        URL busFile = 
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+        Bus bus = bf.createBus(busFile.toString());
+        BusFactory.setDefaultBus(bus);
+        BusFactory.setThreadDefaultBus(bus);
+
+        URL wsdl = 
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+        Service service = Service.create(wsdl, SERVICE_QNAME);
+        QName portQName = new QName(NAMESPACE, 
"DoubleItStreamingMtomSignedPort");
+        DoubleItStreamingMtomPortType port =
+            service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+        updateAddressPort(port, PORT);
+
+        DataHandler response = port.doubleIt5(25);
+
+        int received = drain(response);
+        assertEquals("Should receive all streamed bytes with WS-Security 
(CXF-9129)",
+            MTOMStreamingImpl.CHUNK_COUNT * 
MTOMStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+        Instant serverFinished = MTOMStreamingImpl.getStreamingFinished();
+        assertNotNull("Streaming should have completed", serverFinished);
+
+        ((java.io.Closeable) port).close();
+        bus.shutdown(true);
+    }
+
+    /**
+     * Reproduces the original CXF-9129 bug report scenario without 
WS-Security:
+     * a large MTOM attachment (64 KB &gt; Jetty's default 32 KB output 
buffer) forces
+     * Jetty to auto-flush HTTP chunks while the server is still writing.
+     * The CXF client receives the SOAP body early, creates a lazy 
DataHandler, and
+     * returns to the caller before the server finishes producing the 
remaining chunks.
+     */
+    @org.junit.Test
+    public void testStreamingMTOMTimingReproducer() throws Exception {
+        SpringBusFactory bf = new SpringBusFactory();
+        URL busFile = 
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+        Bus bus = bf.createBus(busFile.toString());
+        BusFactory.setDefaultBus(bus);
+        BusFactory.setThreadDefaultBus(bus);
+
+        URL wsdl = 
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+        Service service = Service.create(wsdl, SERVICE_QNAME);
+        QName portQName = new QName(NAMESPACE, 
"DoubleItStreamingMtomLargePort");
+        DoubleItStreamingMtomPortType port =
+            service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+        updateAddressPort(port, PORT);
+
+        DataHandler response = port.doubleIt5(25);
+        Instant callReturnedInstant = Instant.now();
+
+        int received = drain(response);
+        assertEquals("Should receive all streamed bytes",
+            MTOMLargeStreamingImpl.CHUNK_COUNT * 
MTOMLargeStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+        Instant serverFinished = MTOMLargeStreamingImpl.getStreamingFinished();
+        assertNotNull("Streaming should have completed", serverFinished);
+        assertTrue("JAX-WS call returned before server finished streaming",
+            callReturnedInstant.isBefore(serverFinished));
+
+        ((java.io.Closeable) port).close();
+        bus.shutdown(true);
+    }
+
+    /**
+     * The full CXF-9129 reproducer: large MTOM attachment (64 KB) + 
WS-Security signing
+     * with expandXopInclude=false and storeBytesInAttachment=false.
+     *
+     * Jetty auto-flushes the first ~32 KB (SOAP body + partial attachment) to 
the client
+     * while the server is still writing. Because WSS4J signs only the SOAP 
body (xop:Include
+     * reference) and never touches the raw attachment bytes, the 
PipedInputStream is not
+     * consumed early. The client therefore obtains a lazy DataHandler and the 
JAX-WS call
+     * returns before the server finishes streaming — exactly as in the 
original reproducer.
+     */
+    @org.junit.Test
+    public void testStreamingMTOMTimingReproducerWithSecurity() throws 
Exception {
+        SpringBusFactory bf = new SpringBusFactory();
+        URL busFile = 
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+        Bus bus = bf.createBus(busFile.toString());
+        BusFactory.setDefaultBus(bus);
+        BusFactory.setThreadDefaultBus(bus);
+
+        URL wsdl = 
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+        Service service = Service.create(wsdl, SERVICE_QNAME);
+        QName portQName = new QName(NAMESPACE, 
"DoubleItStreamingMtomLargeSignedPort");
+        DoubleItStreamingMtomPortType port =
+            service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+        updateAddressPort(port, PORT);
+
+        DataHandler response = port.doubleIt5(25);
+        Instant callReturnedInstant = Instant.now();
+
+        int received = drain(response);
+        assertEquals("Should receive all streamed bytes with WS-Security and 
large payload",
+            MTOMLargeStreamingImpl.CHUNK_COUNT * 
MTOMLargeStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+        Instant serverFinished = MTOMLargeStreamingImpl.getStreamingFinished();
+        assertNotNull("Streaming should have completed", serverFinished);
+        assertTrue("JAX-WS call returned before server finished streaming ",
+            callReturnedInstant.isBefore(serverFinished));
+
+        ((java.io.Closeable) port).close();
+        bus.shutdown(true);
+    }
+
+    private static int drain(DataHandler dh) throws Exception {
+        int total = 0;
+        byte[] buf = new byte[4096];
+        try (InputStream is = dh.getInputStream()) {
+            int n;
+            while ((n = is.read(buf)) != -1) {
+                total += n;
+            }
+        }
+        return total;
+    }
+}
diff --git a/systests/ws-security/src/test/resources/DoubleItLogical.wsdl 
b/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
index 832852aad5e..ea213140d4a 100644
--- a/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
+++ b/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
@@ -51,6 +51,21 @@
                     </xsd:sequence>
                 </xsd:complexType>
             </xsd:element>
+            <xsd:element name="DoubleIt5">
+                <xsd:complexType>
+                    <xsd:sequence>
+                        <xsd:element name="numberToDouble" type="xsd:int"/>
+                    </xsd:sequence>
+                </xsd:complexType>
+            </xsd:element>
+            <xsd:element name="DoubleIt5Response">
+                <xsd:complexType>
+                    <xsd:sequence>
+                        <xsd:element name="streamData" type="xsd:base64Binary"
+                            
xmime:expectedContentTypes="application/octet-stream"/>
+                    </xsd:sequence>
+                </xsd:complexType>
+            </xsd:element>
             <xsd:element name="DoubleItResponse">
                 <xsd:complexType>
                     <xsd:sequence>
@@ -82,6 +97,12 @@
     <wsdl:message name="DoubleIt4Request">
         <wsdl:part element="di:DoubleIt4" name="parameters"/>
     </wsdl:message>
+    <wsdl:message name="DoubleIt5Request">
+        <wsdl:part element="di:DoubleIt5" name="parameters"/>
+    </wsdl:message>
+    <wsdl:message name="DoubleIt5Response">
+        <wsdl:part element="di:DoubleIt5Response" name="parameters"/>
+    </wsdl:message>
     <wsdl:message name="DoubleItRequestHeader">
         <wsdl:part element="di:DoubleIt" name="parameters"/>
         <wsdl:part element="di:DoubleItHeader" name="header"/>
@@ -139,6 +160,13 @@
             <wsdl:fault message="tns:DoubleItFault" name="DoubleItFault"/>
         </wsdl:operation>
     </wsdl:portType>
+    <wsdl:portType name="DoubleItStreamingMtomPortType">
+        <wsdl:operation name="DoubleIt5">
+            <wsdl:input message="tns:DoubleIt5Request"/>
+            <wsdl:output message="tns:DoubleIt5Response"/>
+            <wsdl:fault message="tns:DoubleItFault" name="DoubleItFault"/>
+        </wsdl:operation>
+    </wsdl:portType>
     <wsdl:portType name="DoubleItOneWayPortType">
         <wsdl:operation name="DoubleIt">
             <wsdl:input message="tns:DoubleItRequest"/>
diff --git 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
index b3a9a9d30f0..ffa9d816da4 100644
--- 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
+++ 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
@@ -139,7 +139,22 @@
             </wsdl:fault>
         </wsdl:operation>
     </wsdl:binding>
-    
+    <wsdl:binding name="DoubleItStreamingMtomBinding" 
type="tns:DoubleItStreamingMtomPortType">
+        <soap:binding style="document" 
transport="http://schemas.xmlsoap.org/soap/http"/>
+        <wsdl:operation name="DoubleIt5">
+            <soap:operation soapAction=""/>
+            <wsdl:input>
+                <soap:body use="literal"/>
+            </wsdl:input>
+            <wsdl:output>
+                <soap:body use="literal"/>
+            </wsdl:output>
+            <wsdl:fault name="DoubleItFault">
+                <soap:body use="literal" />
+            </wsdl:fault>
+        </wsdl:operation>
+    </wsdl:binding>
+
     <wsdl:service name="DoubleItService">
         <wsdl:port name="DoubleItSignedMTOMInlinePort" 
binding="tns:DoubleItNoSecurityBinding">
             <soap:address 
location="http://localhost:9001/DoubleItSignedMTOMInline"/>
@@ -165,6 +180,18 @@
         <wsdl:port name="DoubleItSymmetricBinaryPort" 
binding="tns:DoubleItSymmetricBinaryBinding">
             <soap:address 
location="http://localhost:9001/DoubleItX509SymmetricBinary"/>
         </wsdl:port>
+        <wsdl:port name="DoubleItStreamingMtomPort" 
binding="tns:DoubleItStreamingMtomBinding">
+            <soap:address 
location="http://localhost:9001/DoubleItStreamingMtom"/>
+        </wsdl:port>
+        <wsdl:port name="DoubleItStreamingMtomSignedPort" 
binding="tns:DoubleItStreamingMtomBinding">
+            <soap:address 
location="http://localhost:9001/DoubleItStreamingMtomSigned"/>
+        </wsdl:port>
+        <wsdl:port name="DoubleItStreamingMtomLargePort" 
binding="tns:DoubleItStreamingMtomBinding">
+            <soap:address 
location="http://localhost:9001/DoubleItStreamingMtomLarge"/>
+        </wsdl:port>
+        <wsdl:port name="DoubleItStreamingMtomLargeSignedPort" 
binding="tns:DoubleItStreamingMtomBinding">
+            <soap:address 
location="http://localhost:9001/DoubleItStreamingMtomLargeSigned"/>
+        </wsdl:port>
     </wsdl:service>
     
     <wsp:Policy wsu:Id="DoubleItAsymmetricPolicy">
diff --git 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
index bd40a88a225..8cf34ea2660 100644
--- 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
+++ 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
@@ -170,5 +170,28 @@
             <entry key="mtom-enabled" value="true"/>
         </jaxws:properties>
     </jaxws:client>
-    
+
+    <jaxws:client 
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomPort"; 
createdFromAPI="true">
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:client>
+
+    <jaxws:client 
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomSignedPort";
 createdFromAPI="true">
+        <jaxws:inInterceptors>
+            <bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor">
+                <constructor-arg>
+                    <map>
+                        <entry key="action" value="Signature"/>
+                        <entry key="signatureVerificationPropFile" 
value="bob.properties"/>
+                        <entry key="expandXOPInclude" value="false"/>
+                    </map>
+                </constructor-arg>
+            </bean>
+        </jaxws:inInterceptors>
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:client>
+
 </beans>
\ No newline at end of file
diff --git 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
index 9b946e36c8d..6a8b1727917 100644
--- 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
+++ 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
@@ -39,6 +39,9 @@
         <property name="enforcePrincipal" value="false"/>
     </bean>
 
+    <bean id="mtomStreamingImpl" 
class="org.apache.cxf.systest.ws.mtom.MTOMStreamingImpl"/>
+    <bean id="mtomLargeStreamingImpl" 
class="org.apache.cxf.systest.ws.mtom.MTOMLargeStreamingImpl"/>
+
     <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"; 
id="SignedMTOM" 
         
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItSignedMTOMInline";
 
         serviceName="s:DoubleItService" 
endpointName="s:DoubleItSignedMTOMInlinePort" 
@@ -180,5 +183,76 @@
             <entry key="mtom-enabled" value="true"/>
         </jaxws:properties>
     </jaxws:endpoint>
-    
+
+    <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"; 
id="StreamingMtom"
+        
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtom";
+        serviceName="s:DoubleItService" 
endpointName="s:DoubleItStreamingMtomPort"
+        implementor="#mtomStreamingImpl"
+        wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:endpoint>
+
+    <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"; 
id="StreamingMtomSigned"
+        
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtomSigned";
+        serviceName="s:DoubleItService" 
endpointName="s:DoubleItStreamingMtomSignedPort"
+        implementor="#mtomStreamingImpl"
+        wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+        <jaxws:outInterceptors>
+            <bean class="org.apache.cxf.ws.security.wss4j.WSS4JOutInterceptor">
+                <constructor-arg>
+                    <map>
+                        <entry key="action" value="Signature"/>
+                        <entry key="signatureUser" value="bob"/>
+                        <entry key="signaturePropFile" value="bob.properties"/>
+                        <entry key="signatureKeyIdentifier" 
value="DirectReference"/>
+                        <entry key="passwordCallbackClass" 
value="org.apache.cxf.systest.ws.common.KeystorePasswordCallback"/>
+                        <entry key="expandXOPInclude" value="false"/>
+                        <entry key="storeBytesInAttachment" value="false"/>
+                    </map>
+                </constructor-arg>
+            </bean>
+        </jaxws:outInterceptors>
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:endpoint>
+
+
+    <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"; 
id="StreamingMtomLarge"
+        
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtomLarge";
+        serviceName="s:DoubleItService" 
endpointName="s:DoubleItStreamingMtomLargePort"
+        implementor="#mtomLargeStreamingImpl"
+        wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:endpoint>
+
+    <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"; 
id="StreamingMtomLargeSigned"
+        
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtomLargeSigned";
+        serviceName="s:DoubleItService" 
endpointName="s:DoubleItStreamingMtomLargeSignedPort"
+        implementor="#mtomLargeStreamingImpl"
+        wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+        <jaxws:outInterceptors>
+            <bean class="org.apache.cxf.ws.security.wss4j.WSS4JOutInterceptor">
+                <constructor-arg>
+                    <map>
+                        <entry key="action" value="Signature"/>
+                        <entry key="signatureUser" value="bob"/>
+                        <entry key="signaturePropFile" value="bob.properties"/>
+                        <entry key="signatureKeyIdentifier" 
value="DirectReference"/>
+                        <entry key="passwordCallbackClass" 
value="org.apache.cxf.systest.ws.common.KeystorePasswordCallback"/>
+                        <entry key="expandXOPInclude" value="false"/>
+                        <entry key="storeBytesInAttachment" value="false"/>
+                    </map>
+                </constructor-arg>
+            </bean>
+        </jaxws:outInterceptors>
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:endpoint>
+
 </beans>
\ No newline at end of file
diff --git 
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/streaming-client.xml
 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/streaming-client.xml
new file mode 100644
index 00000000000..f79dfeacc83
--- /dev/null
+++ 
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/streaming-client.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xmlns:http="http://cxf.apache.org/transports/http/configuration";
+    xmlns:jaxws="http://cxf.apache.org/jaxws";
+    xmlns:cxf="http://cxf.apache.org/core";
+    xmlns:p="http://cxf.apache.org/policy";
+    xmlns:sec="http://cxf.apache.org/configuration/security";
+    xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd 
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd 
http://cxf.apache.org/transports/http/configuration 
http://cxf.apache.org/schemas/configuration/http-conf.xsd 
http://cxf.apache.org/configuration/security 
http://cxf.apache.org/schemas/configuration/security.xsd 
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd 
http://cxf.apache [...]
+    <cxf:bus>
+        <cxf:features>
+            <p:policies/>
+        </cxf:features>
+    </cxf:bus>
+
+    <jaxws:client 
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomPort"; 
createdFromAPI="true">
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:client>
+
+    <jaxws:client 
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomSignedPort";
 createdFromAPI="true">
+        <jaxws:inInterceptors>
+            <bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor">
+                <constructor-arg>
+                    <map>
+                        <entry key="action" value="Signature"/>
+                        <entry key="signatureVerificationPropFile" 
value="bob.properties"/>
+                        <entry key="expandXOPInclude" value="false"/>
+                    </map>
+                </constructor-arg>
+            </bean>
+        </jaxws:inInterceptors>
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:client>
+
+
+    <jaxws:client 
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomLargePort"; 
createdFromAPI="true">
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:client>
+
+    <jaxws:client 
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomLargeSignedPort";
 createdFromAPI="true">
+        <jaxws:inInterceptors>
+            <bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor">
+                <constructor-arg>
+                    <map>
+                        <entry key="action" value="Signature"/>
+                        <entry key="signatureVerificationPropFile" 
value="bob.properties"/>
+                        <entry key="expandXOPInclude" value="false"/>
+                    </map>
+                </constructor-arg>
+            </bean>
+        </jaxws:inInterceptors>
+        <jaxws:properties>
+            <entry key="mtom-enabled" value="true"/>
+        </jaxws:properties>
+    </jaxws:client>
+
+</beans>


Reply via email to