This is an automated email from the ASF dual-hosted git repository.
ffang pushed a commit to branch 4.1.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/4.1.x-fixes by this push:
new dd13bb24dc5 [CXF-9129]Chunked attachment streaming not working when
using ws-security (#3153)
dd13bb24dc5 is described below
commit dd13bb24dc5785961409828378570312483aaacb
Author: Freeman(Yue) Fang <[email protected]>
AuthorDate: Fri May 29 10:50:21 2026 -0400
[CXF-9129]Chunked attachment streaming not working when using ws-security
(#3153)
(cherry picked from commit 2e26e6c976d4d49309cef7f942286abd3368adff)
---
.../cxf/binding/soap/saaj/SAAJInInterceptor.java | 2 +-
.../security/wss4j/AbstractWSS4JInterceptor.java | 24 +++
.../systest/ws/mtom/MTOMLargeStreamingImpl.java | 113 ++++++++++
.../cxf/systest/ws/mtom/MTOMStreamingImpl.java | 105 +++++++++
.../systest/ws/mtom/MTOMStreamingSecurityTest.java | 239 +++++++++++++++++++++
.../src/test/resources/DoubleItLogical.wsdl | 28 +++
.../apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl | 29 ++-
.../org/apache/cxf/systest/ws/mtom/client.xml | 25 ++-
.../org/apache/cxf/systest/ws/mtom/server.xml | 76 ++++++-
.../cxf/systest/ws/mtom/streaming-client.xml | 81 +++++++
10 files changed, 718 insertions(+), 4 deletions(-)
diff --git
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
index 22101c60c04..759f5a3e927 100644
---
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
+++
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/saaj/SAAJInInterceptor.java
@@ -205,7 +205,7 @@ public class SAAJInInterceptor extends
AbstractSoapInterceptor {
message.setContent(Node.class, soapMessage.getSOAPPart());
Collection<Attachment> atts = message.getAttachments();
- if (atts != null) {
+ if (atts != null &&
!"false".equals(message.get("expandXOPInclude"))) {
for (Attachment a : atts) {
if (a.getDataHandler().getDataSource() instanceof
AttachmentDataSource) {
try {
diff --git
a/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
b/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
index e66212bf892..4205553f389 100644
---
a/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
+++
b/rt/ws/security/src/main/java/org/apache/cxf/ws/security/wss4j/AbstractWSS4JInterceptor.java
@@ -207,6 +207,30 @@ public abstract class AbstractWSS4JInterceptor extends
WSHandler implements Soap
if (passwordEncryptor != null) {
msg.put(ConfigurationConstants.PASSWORD_ENCRYPTOR_INSTANCE,
passwordEncryptor);
}
+
+ Object expandXOP =
SecurityUtils.getSecurityPropertyValue(SecurityConstants.EXPAND_XOP_INCLUDE,
msg);
+ if (expandXOP != null) {
+ msg.put(ConfigurationConstants.EXPAND_XOP_INCLUDE,
+ Boolean.parseBoolean(expandXOP.toString()) ? "true" :
"false");
+ } else {
+ // Also propagate the WSS4J-level key from the options map so that
SAAJInInterceptor
+ // can skip attachment caching when expandXOPInclude=false is
configured directly.
+ String expandXOPOption = (String)
getOption(ConfigurationConstants.EXPAND_XOP_INCLUDE);
+ if (expandXOPOption != null) {
+ msg.put(ConfigurationConstants.EXPAND_XOP_INCLUDE,
expandXOPOption);
+ }
+ }
+ Object storeBytes =
+
SecurityUtils.getSecurityPropertyValue(SecurityConstants.STORE_BYTES_IN_ATTACHMENT,
msg);
+ if (storeBytes != null) {
+ msg.put(ConfigurationConstants.STORE_BYTES_IN_ATTACHMENT,
+ Boolean.parseBoolean(storeBytes.toString()) ? "true" :
"false");
+ } else {
+ String storeBytesOption = (String)
getOption(ConfigurationConstants.STORE_BYTES_IN_ATTACHMENT);
+ if (storeBytesOption != null) {
+ msg.put(ConfigurationConstants.STORE_BYTES_IN_ATTACHMENT,
storeBytesOption);
+ }
+ }
}
@Override
diff --git
a/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMLargeStreamingImpl.java
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMLargeStreamingImpl.java
new file mode 100644
index 00000000000..31800029cd4
--- /dev/null
+++
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMLargeStreamingImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.mtom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jakarta.activation.DataHandler;
+import jakarta.activation.DataSource;
+import jakarta.jws.WebService;
+import org.example.contract.doubleit.DoubleItFault;
+import org.example.contract.doubleit.DoubleItStreamingMtomPortType;
+
+/**
+ * Large-payload MTOM streaming implementation used by
testStreamingMTOMTimingReproducer.
+ *
+ * The 64 × 1 KB = 64 KB total payload exceeds Jetty's default 32 KB output
buffer.
+ * Jetty auto-flushes to the client when its buffer fills, so the CXF client
receives
+ * the complete SOAP body and obtains a lazy DataHandler before the server
finishes
+ * writing the remaining attachment chunks.
+ */
+@WebService(targetNamespace = "http://www.example.org/contract/DoubleIt",
+ serviceName = "DoubleItService",
+ endpointInterface =
"org.example.contract.doubleit.DoubleItStreamingMtomPortType")
+public class MTOMLargeStreamingImpl implements DoubleItStreamingMtomPortType {
+
+ static final long CHUNK_DELAY_MS = 100L;
+ static final int CHUNK_COUNT = 64;
+ static final int CHUNK_SIZE_BYTES = 1024;
+
+ static final AtomicReference<Instant> STREAMING_FINISHED = new
AtomicReference<>();
+
+ public static void resetStreamingFinished() {
+ STREAMING_FINISHED.set(null);
+ }
+
+ public static Instant getStreamingFinished() {
+ return STREAMING_FINISHED.get();
+ }
+
+ @Override
+ public DataHandler doubleIt5(int numberToDouble) throws DoubleItFault {
+ if (numberToDouble == 0) {
+ throw new DoubleItFault("0 can't be doubled!");
+ }
+ try {
+ PipedInputStream pipedIn = new PipedInputStream(CHUNK_SIZE_BYTES *
2);
+ PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);
+ Thread writerThread = new Thread(() -> {
+ byte[] chunk = new byte[CHUNK_SIZE_BYTES];
+ Arrays.fill(chunk, (byte) 'A');
+ try {
+ for (int i = 0; i < CHUNK_COUNT; i++) {
+ pipedOut.write(chunk);
+ pipedOut.flush();
+ Thread.sleep(CHUNK_DELAY_MS);
+ }
+ pipedOut.close();
+ STREAMING_FINISHED.set(Instant.now());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ // pipe may be closed early by consumer; no further action
needed
+ }
+ });
+ writerThread.setDaemon(true);
+ writerThread.start();
+ DataSource dataSource = new DataSource() {
+ @Override
+ public InputStream getInputStream() {
+ return pipedIn;
+ }
+ @Override
+ public OutputStream getOutputStream() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public String getContentType() {
+ return "application/octet-stream";
+ }
+ @Override
+ public String getName() {
+ return "streaming-data-large";
+ }
+ };
+ return new DataHandler(dataSource);
+ } catch (IOException e) {
+ throw new DoubleItFault("Error creating streaming response: " +
e.getMessage());
+ }
+ }
+}
diff --git
a/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingImpl.java
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingImpl.java
new file mode 100644
index 00000000000..d4a8cf1e4ad
--- /dev/null
+++
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingImpl.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.mtom;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jakarta.activation.DataHandler;
+import jakarta.activation.DataSource;
+import jakarta.jws.WebService;
+import org.example.contract.doubleit.DoubleItFault;
+import org.example.contract.doubleit.DoubleItStreamingMtomPortType;
+
+@WebService(targetNamespace = "http://www.example.org/contract/DoubleIt",
+ serviceName = "DoubleItService",
+ endpointInterface =
"org.example.contract.doubleit.DoubleItStreamingMtomPortType")
+public class MTOMStreamingImpl implements DoubleItStreamingMtomPortType {
+
+ static final long CHUNK_DELAY_MS = 500L;
+ static final int CHUNK_COUNT = 5;
+ static final int CHUNK_SIZE_BYTES = 1024;
+
+ static final AtomicReference<Instant> STREAMING_FINISHED = new
AtomicReference<>();
+
+ public static void resetStreamingFinished() {
+ STREAMING_FINISHED.set(null);
+ }
+
+ public static Instant getStreamingFinished() {
+ return STREAMING_FINISHED.get();
+ }
+
+ @Override
+ public DataHandler doubleIt5(int numberToDouble) throws DoubleItFault {
+ if (numberToDouble == 0) {
+ throw new DoubleItFault("0 can't be doubled!");
+ }
+ try {
+ PipedInputStream pipedIn = new PipedInputStream(CHUNK_SIZE_BYTES *
2);
+ PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);
+ Thread writerThread = new Thread(() -> {
+ byte[] chunk = new byte[CHUNK_SIZE_BYTES];
+ Arrays.fill(chunk, (byte) 'A');
+ try {
+ for (int i = 0; i < CHUNK_COUNT; i++) {
+ pipedOut.write(chunk);
+ pipedOut.flush();
+ Thread.sleep(CHUNK_DELAY_MS);
+ }
+ pipedOut.close();
+ STREAMING_FINISHED.set(Instant.now());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ // pipe may be closed early by consumer; no further action
needed
+ }
+ });
+ writerThread.setDaemon(true);
+ writerThread.start();
+ DataSource dataSource = new DataSource() {
+ @Override
+ public InputStream getInputStream() {
+ return pipedIn;
+ }
+ @Override
+ public OutputStream getOutputStream() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public String getContentType() {
+ return "application/octet-stream";
+ }
+ @Override
+ public String getName() {
+ return "streaming-data";
+ }
+ };
+ return new DataHandler(dataSource);
+ } catch (IOException e) {
+ throw new DoubleItFault("Error creating streaming response: " +
e.getMessage());
+ }
+ }
+}
diff --git
a/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingSecurityTest.java
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingSecurityTest.java
new file mode 100644
index 00000000000..54648ee45cb
--- /dev/null
+++
b/systests/ws-security/src/test/java/org/apache/cxf/systest/ws/mtom/MTOMStreamingSecurityTest.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.ws.mtom;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.time.Instant;
+
+import javax.xml.namespace.QName;
+
+import jakarta.activation.DataHandler;
+import jakarta.xml.ws.Service;
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.example.contract.doubleit.DoubleItStreamingMtomPortType;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for CXF-9129: MTOM attachment streaming with WS-Security.
+ *
+ * The root cause is that WSS4JOutInterceptor defaults to
expandXopInclude=true and
+ * storeBytesInAttachment=true when MTOM is enabled. With these defaults WSS4J
reads every
+ * attachment byte during the POST_PROTOCOL security phase and processes
attachments via
+ * SwA-style references, which prevents streaming DataHandlers from working
correctly
+ * (the attachment stream is consumed before AttachmentOutEndingInterceptor
can write it).
+ *
+ * The fix is to set expandXopInclude=false and storeBytesInAttachment=false on
+ * WSS4JOutInterceptor. WS-Security then signs only the SOAP body (which
contains the
+ * xop:Include reference), leaving the raw attachment bytes to flow through
+ * AttachmentOutEndingInterceptor at PRE_STREAM_ENDING without being consumed
early.
+ */
+public class MTOMStreamingSecurityTest extends AbstractBusClientServerTestBase
{
+
+ public static final String PORT = allocatePort(MTOMServer.class);
+
+ private static final String NAMESPACE =
"http://www.example.org/contract/DoubleIt";
+ private static final QName SERVICE_QNAME = new QName(NAMESPACE,
"DoubleItService");
+
+ @Before
+ public void resetStreamingTracker() {
+ MTOMStreamingImpl.resetStreamingFinished();
+ MTOMLargeStreamingImpl.resetStreamingFinished();
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ assertTrue(
+ "Server failed to launch",
+ launchServer(MTOMServer.class, true)
+ );
+ }
+
+ @org.junit.AfterClass
+ public static void cleanup() throws Exception {
+ stopAllServers();
+ }
+
+ /**
+ * Baseline: without WS-Security, the MTOM endpoint returns all streamed
bytes correctly.
+ */
+ @org.junit.Test
+ public void testStreamingMTOMNoSecurity() throws Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ URL busFile =
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+ Bus bus = bf.createBus(busFile.toString());
+ BusFactory.setDefaultBus(bus);
+ BusFactory.setThreadDefaultBus(bus);
+
+ URL wsdl =
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+ Service service = Service.create(wsdl, SERVICE_QNAME);
+ QName portQName = new QName(NAMESPACE, "DoubleItStreamingMtomPort");
+ DoubleItStreamingMtomPortType port =
+ service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+ updateAddressPort(port, PORT);
+
+ DataHandler response = port.doubleIt5(25);
+
+ int received = drain(response);
+ assertEquals("Should receive all streamed bytes",
+ MTOMStreamingImpl.CHUNK_COUNT *
MTOMStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+ Instant serverFinished = MTOMStreamingImpl.getStreamingFinished();
+ assertNotNull("Streaming should have completed", serverFinished);
+
+ ((java.io.Closeable) port).close();
+ bus.shutdown(true);
+ }
+
+ /**
+ * Tests CXF-9129 fix: with expandXopInclude=false and
storeBytesInAttachment=false, the
+ * signed MTOM endpoint returns all streamed bytes correctly without
errors.
+ */
+ @org.junit.Test
+ public void testStreamingMTOMWithExpandXopFix() throws Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ URL busFile =
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+ Bus bus = bf.createBus(busFile.toString());
+ BusFactory.setDefaultBus(bus);
+ BusFactory.setThreadDefaultBus(bus);
+
+ URL wsdl =
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+ Service service = Service.create(wsdl, SERVICE_QNAME);
+ QName portQName = new QName(NAMESPACE,
"DoubleItStreamingMtomSignedPort");
+ DoubleItStreamingMtomPortType port =
+ service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+ updateAddressPort(port, PORT);
+
+ DataHandler response = port.doubleIt5(25);
+
+ int received = drain(response);
+ assertEquals("Should receive all streamed bytes with WS-Security
(CXF-9129)",
+ MTOMStreamingImpl.CHUNK_COUNT *
MTOMStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+ Instant serverFinished = MTOMStreamingImpl.getStreamingFinished();
+ assertNotNull("Streaming should have completed", serverFinished);
+
+ ((java.io.Closeable) port).close();
+ bus.shutdown(true);
+ }
+
+ /**
+ * Reproduces the original CXF-9129 bug report scenario without
WS-Security:
+ * a large MTOM attachment (64 KB > Jetty's default 32 KB output
buffer) forces
+ * Jetty to auto-flush HTTP chunks while the server is still writing.
+ * The CXF client receives the SOAP body early, creates a lazy
DataHandler, and
+ * returns to the caller before the server finishes producing the
remaining chunks.
+ */
+ @org.junit.Test
+ public void testStreamingMTOMTimingReproducer() throws Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ URL busFile =
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+ Bus bus = bf.createBus(busFile.toString());
+ BusFactory.setDefaultBus(bus);
+ BusFactory.setThreadDefaultBus(bus);
+
+ URL wsdl =
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+ Service service = Service.create(wsdl, SERVICE_QNAME);
+ QName portQName = new QName(NAMESPACE,
"DoubleItStreamingMtomLargePort");
+ DoubleItStreamingMtomPortType port =
+ service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+ updateAddressPort(port, PORT);
+
+ DataHandler response = port.doubleIt5(25);
+ Instant callReturnedInstant = Instant.now();
+
+ int received = drain(response);
+ assertEquals("Should receive all streamed bytes",
+ MTOMLargeStreamingImpl.CHUNK_COUNT *
MTOMLargeStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+ Instant serverFinished = MTOMLargeStreamingImpl.getStreamingFinished();
+ assertNotNull("Streaming should have completed", serverFinished);
+ assertTrue("JAX-WS call returned before server finished streaming",
+ callReturnedInstant.isBefore(serverFinished));
+
+ ((java.io.Closeable) port).close();
+ bus.shutdown(true);
+ }
+
+ /**
+ * The full CXF-9129 reproducer: large MTOM attachment (64 KB) +
WS-Security signing
+ * with expandXopInclude=false and storeBytesInAttachment=false.
+ *
+ * Jetty auto-flushes the first ~32 KB (SOAP body + partial attachment) to
the client
+ * while the server is still writing. Because WSS4J signs only the SOAP
body (xop:Include
+ * reference) and never touches the raw attachment bytes, the
PipedInputStream is not
+ * consumed early. The client therefore obtains a lazy DataHandler and the
JAX-WS call
+ * returns before the server finishes streaming — exactly as in the
original reproducer.
+ */
+ @org.junit.Test
+ public void testStreamingMTOMTimingReproducerWithSecurity() throws
Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ URL busFile =
MTOMStreamingSecurityTest.class.getResource("streaming-client.xml");
+
+ Bus bus = bf.createBus(busFile.toString());
+ BusFactory.setDefaultBus(bus);
+ BusFactory.setThreadDefaultBus(bus);
+
+ URL wsdl =
MTOMStreamingSecurityTest.class.getResource("DoubleItMtom.wsdl");
+ Service service = Service.create(wsdl, SERVICE_QNAME);
+ QName portQName = new QName(NAMESPACE,
"DoubleItStreamingMtomLargeSignedPort");
+ DoubleItStreamingMtomPortType port =
+ service.getPort(portQName, DoubleItStreamingMtomPortType.class);
+ updateAddressPort(port, PORT);
+
+ DataHandler response = port.doubleIt5(25);
+ Instant callReturnedInstant = Instant.now();
+
+ int received = drain(response);
+ assertEquals("Should receive all streamed bytes with WS-Security and
large payload",
+ MTOMLargeStreamingImpl.CHUNK_COUNT *
MTOMLargeStreamingImpl.CHUNK_SIZE_BYTES, received);
+
+ Instant serverFinished = MTOMLargeStreamingImpl.getStreamingFinished();
+ assertNotNull("Streaming should have completed", serverFinished);
+ assertTrue("JAX-WS call returned before server finished streaming ",
+ callReturnedInstant.isBefore(serverFinished));
+
+ ((java.io.Closeable) port).close();
+ bus.shutdown(true);
+ }
+
+ private static int drain(DataHandler dh) throws Exception {
+ int total = 0;
+ byte[] buf = new byte[4096];
+ try (InputStream is = dh.getInputStream()) {
+ int n;
+ while ((n = is.read(buf)) != -1) {
+ total += n;
+ }
+ }
+ return total;
+ }
+}
diff --git a/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
b/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
index 832852aad5e..ea213140d4a 100644
--- a/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
+++ b/systests/ws-security/src/test/resources/DoubleItLogical.wsdl
@@ -51,6 +51,21 @@
</xsd:sequence>
</xsd:complexType>
</xsd:element>
+ <xsd:element name="DoubleIt5">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="numberToDouble" type="xsd:int"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="DoubleIt5Response">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="streamData" type="xsd:base64Binary"
+
xmime:expectedContentTypes="application/octet-stream"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
<xsd:element name="DoubleItResponse">
<xsd:complexType>
<xsd:sequence>
@@ -82,6 +97,12 @@
<wsdl:message name="DoubleIt4Request">
<wsdl:part element="di:DoubleIt4" name="parameters"/>
</wsdl:message>
+ <wsdl:message name="DoubleIt5Request">
+ <wsdl:part element="di:DoubleIt5" name="parameters"/>
+ </wsdl:message>
+ <wsdl:message name="DoubleIt5Response">
+ <wsdl:part element="di:DoubleIt5Response" name="parameters"/>
+ </wsdl:message>
<wsdl:message name="DoubleItRequestHeader">
<wsdl:part element="di:DoubleIt" name="parameters"/>
<wsdl:part element="di:DoubleItHeader" name="header"/>
@@ -139,6 +160,13 @@
<wsdl:fault message="tns:DoubleItFault" name="DoubleItFault"/>
</wsdl:operation>
</wsdl:portType>
+ <wsdl:portType name="DoubleItStreamingMtomPortType">
+ <wsdl:operation name="DoubleIt5">
+ <wsdl:input message="tns:DoubleIt5Request"/>
+ <wsdl:output message="tns:DoubleIt5Response"/>
+ <wsdl:fault message="tns:DoubleItFault" name="DoubleItFault"/>
+ </wsdl:operation>
+ </wsdl:portType>
<wsdl:portType name="DoubleItOneWayPortType">
<wsdl:operation name="DoubleIt">
<wsdl:input message="tns:DoubleItRequest"/>
diff --git
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
index b3a9a9d30f0..ffa9d816da4 100644
---
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
+++
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl
@@ -139,7 +139,22 @@
</wsdl:fault>
</wsdl:operation>
</wsdl:binding>
-
+ <wsdl:binding name="DoubleItStreamingMtomBinding"
type="tns:DoubleItStreamingMtomPortType">
+ <soap:binding style="document"
transport="http://schemas.xmlsoap.org/soap/http"/>
+ <wsdl:operation name="DoubleIt5">
+ <soap:operation soapAction=""/>
+ <wsdl:input>
+ <soap:body use="literal"/>
+ </wsdl:input>
+ <wsdl:output>
+ <soap:body use="literal"/>
+ </wsdl:output>
+ <wsdl:fault name="DoubleItFault">
+ <soap:body use="literal" />
+ </wsdl:fault>
+ </wsdl:operation>
+ </wsdl:binding>
+
<wsdl:service name="DoubleItService">
<wsdl:port name="DoubleItSignedMTOMInlinePort"
binding="tns:DoubleItNoSecurityBinding">
<soap:address
location="http://localhost:9001/DoubleItSignedMTOMInline"/>
@@ -165,6 +180,18 @@
<wsdl:port name="DoubleItSymmetricBinaryPort"
binding="tns:DoubleItSymmetricBinaryBinding">
<soap:address
location="http://localhost:9001/DoubleItX509SymmetricBinary"/>
</wsdl:port>
+ <wsdl:port name="DoubleItStreamingMtomPort"
binding="tns:DoubleItStreamingMtomBinding">
+ <soap:address
location="http://localhost:9001/DoubleItStreamingMtom"/>
+ </wsdl:port>
+ <wsdl:port name="DoubleItStreamingMtomSignedPort"
binding="tns:DoubleItStreamingMtomBinding">
+ <soap:address
location="http://localhost:9001/DoubleItStreamingMtomSigned"/>
+ </wsdl:port>
+ <wsdl:port name="DoubleItStreamingMtomLargePort"
binding="tns:DoubleItStreamingMtomBinding">
+ <soap:address
location="http://localhost:9001/DoubleItStreamingMtomLarge"/>
+ </wsdl:port>
+ <wsdl:port name="DoubleItStreamingMtomLargeSignedPort"
binding="tns:DoubleItStreamingMtomBinding">
+ <soap:address
location="http://localhost:9001/DoubleItStreamingMtomLargeSigned"/>
+ </wsdl:port>
</wsdl:service>
<wsp:Policy wsu:Id="DoubleItAsymmetricPolicy">
diff --git
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
index bd40a88a225..8cf34ea2660 100644
---
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
+++
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/client.xml
@@ -170,5 +170,28 @@
<entry key="mtom-enabled" value="true"/>
</jaxws:properties>
</jaxws:client>
-
+
+ <jaxws:client
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomPort"
createdFromAPI="true">
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:client>
+
+ <jaxws:client
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomSignedPort"
createdFromAPI="true">
+ <jaxws:inInterceptors>
+ <bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor">
+ <constructor-arg>
+ <map>
+ <entry key="action" value="Signature"/>
+ <entry key="signatureVerificationPropFile"
value="bob.properties"/>
+ <entry key="expandXOPInclude" value="false"/>
+ </map>
+ </constructor-arg>
+ </bean>
+ </jaxws:inInterceptors>
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:client>
+
</beans>
\ No newline at end of file
diff --git
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
index 9b946e36c8d..6a8b1727917 100644
---
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
+++
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/server.xml
@@ -39,6 +39,9 @@
<property name="enforcePrincipal" value="false"/>
</bean>
+ <bean id="mtomStreamingImpl"
class="org.apache.cxf.systest.ws.mtom.MTOMStreamingImpl"/>
+ <bean id="mtomLargeStreamingImpl"
class="org.apache.cxf.systest.ws.mtom.MTOMLargeStreamingImpl"/>
+
<jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"
id="SignedMTOM"
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItSignedMTOMInline"
serviceName="s:DoubleItService"
endpointName="s:DoubleItSignedMTOMInlinePort"
@@ -180,5 +183,76 @@
<entry key="mtom-enabled" value="true"/>
</jaxws:properties>
</jaxws:endpoint>
-
+
+ <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"
id="StreamingMtom"
+
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtom"
+ serviceName="s:DoubleItService"
endpointName="s:DoubleItStreamingMtomPort"
+ implementor="#mtomStreamingImpl"
+ wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:endpoint>
+
+ <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"
id="StreamingMtomSigned"
+
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtomSigned"
+ serviceName="s:DoubleItService"
endpointName="s:DoubleItStreamingMtomSignedPort"
+ implementor="#mtomStreamingImpl"
+ wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+ <jaxws:outInterceptors>
+ <bean class="org.apache.cxf.ws.security.wss4j.WSS4JOutInterceptor">
+ <constructor-arg>
+ <map>
+ <entry key="action" value="Signature"/>
+ <entry key="signatureUser" value="bob"/>
+ <entry key="signaturePropFile" value="bob.properties"/>
+ <entry key="signatureKeyIdentifier"
value="DirectReference"/>
+ <entry key="passwordCallbackClass"
value="org.apache.cxf.systest.ws.common.KeystorePasswordCallback"/>
+ <entry key="expandXOPInclude" value="false"/>
+ <entry key="storeBytesInAttachment" value="false"/>
+ </map>
+ </constructor-arg>
+ </bean>
+ </jaxws:outInterceptors>
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:endpoint>
+
+
+ <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"
id="StreamingMtomLarge"
+
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtomLarge"
+ serviceName="s:DoubleItService"
endpointName="s:DoubleItStreamingMtomLargePort"
+ implementor="#mtomLargeStreamingImpl"
+ wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:endpoint>
+
+ <jaxws:endpoint xmlns:s="http://www.example.org/contract/DoubleIt"
id="StreamingMtomLargeSigned"
+
address="http://localhost:${testutil.ports.mtom.MTOMServer}/DoubleItStreamingMtomLargeSigned"
+ serviceName="s:DoubleItService"
endpointName="s:DoubleItStreamingMtomLargeSignedPort"
+ implementor="#mtomLargeStreamingImpl"
+ wsdlLocation="org/apache/cxf/systest/ws/mtom/DoubleItMtom.wsdl">
+ <jaxws:outInterceptors>
+ <bean class="org.apache.cxf.ws.security.wss4j.WSS4JOutInterceptor">
+ <constructor-arg>
+ <map>
+ <entry key="action" value="Signature"/>
+ <entry key="signatureUser" value="bob"/>
+ <entry key="signaturePropFile" value="bob.properties"/>
+ <entry key="signatureKeyIdentifier"
value="DirectReference"/>
+ <entry key="passwordCallbackClass"
value="org.apache.cxf.systest.ws.common.KeystorePasswordCallback"/>
+ <entry key="expandXOPInclude" value="false"/>
+ <entry key="storeBytesInAttachment" value="false"/>
+ </map>
+ </constructor-arg>
+ </bean>
+ </jaxws:outInterceptors>
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:endpoint>
+
</beans>
\ No newline at end of file
diff --git
a/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/streaming-client.xml
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/streaming-client.xml
new file mode 100644
index 00000000000..f79dfeacc83
--- /dev/null
+++
b/systests/ws-security/src/test/resources/org/apache/cxf/systest/ws/mtom/streaming-client.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:http="http://cxf.apache.org/transports/http/configuration"
+ xmlns:jaxws="http://cxf.apache.org/jaxws"
+ xmlns:cxf="http://cxf.apache.org/core"
+ xmlns:p="http://cxf.apache.org/policy"
+ xmlns:sec="http://cxf.apache.org/configuration/security"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://cxf.apache.org/transports/http/configuration
http://cxf.apache.org/schemas/configuration/http-conf.xsd
http://cxf.apache.org/configuration/security
http://cxf.apache.org/schemas/configuration/security.xsd
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
http://cxf.apache [...]
+ <cxf:bus>
+ <cxf:features>
+ <p:policies/>
+ </cxf:features>
+ </cxf:bus>
+
+ <jaxws:client
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomPort"
createdFromAPI="true">
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:client>
+
+ <jaxws:client
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomSignedPort"
createdFromAPI="true">
+ <jaxws:inInterceptors>
+ <bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor">
+ <constructor-arg>
+ <map>
+ <entry key="action" value="Signature"/>
+ <entry key="signatureVerificationPropFile"
value="bob.properties"/>
+ <entry key="expandXOPInclude" value="false"/>
+ </map>
+ </constructor-arg>
+ </bean>
+ </jaxws:inInterceptors>
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:client>
+
+
+ <jaxws:client
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomLargePort"
createdFromAPI="true">
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:client>
+
+ <jaxws:client
name="{http://www.example.org/contract/DoubleIt}DoubleItStreamingMtomLargeSignedPort"
createdFromAPI="true">
+ <jaxws:inInterceptors>
+ <bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor">
+ <constructor-arg>
+ <map>
+ <entry key="action" value="Signature"/>
+ <entry key="signatureVerificationPropFile"
value="bob.properties"/>
+ <entry key="expandXOPInclude" value="false"/>
+ </map>
+ </constructor-arg>
+ </bean>
+ </jaxws:inInterceptors>
+ <jaxws:properties>
+ <entry key="mtom-enabled" value="true"/>
+ </jaxws:properties>
+ </jaxws:client>
+
+</beans>