This is an automated email from the ASF dual-hosted git repository. robertlazarski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/axis-axis2-java-core.git
commit a4721f18eec7c0ae161776e780bd4e767b7bc092 Author: Robert Lazarski <[email protected]> AuthorDate: Tue Apr 14 10:37:25 2026 -1000 AXIS2-6103 Add streaming JSON message formatters for large HTTP responses New package org.apache.axis2.json.streaming with three classes: - FlushingOutputStream: wraps transport OutputStream, flushes every 64KB (configurable). Prevents reverse proxy 502 Bad Gateway on large responses by converting a single buffered body into a stream of HTTP/2 DATA frames or HTTP/1.1 chunks. - JSONStreamingMessageFormatter: GSON variant. Drop-in replacement for org.apache.axis2.json.gson.JsonFormatter. - MoshiStreamingMessageFormatter: Moshi variant. Drop-in replacement for org.apache.axis2.json.moshi.JsonFormatter. Both formatters use try-with-resources for JsonWriter lifecycle (addressed in Gemini code review). FlushingOutputStream uses long for bytesSinceFlush counter to prevent int overflow on 2GB+ writes. Enable globally in axis2.xml: <messageFormatter contentType="application/json" class="org.apache.axis2.json.streaming.MoshiStreamingMessageFormatter"/> Optional per-service flush interval tuning in services.xml: <parameter name="streamingFlushIntervalBytes">131072</parameter> No service code changes required. Tested on WildFly 32 locally and behind a reverse proxy on stg-rapi02 (HTTP/2 ALPN). All existing services (BigDataH2Service, FinancialBenchmarkService) produce bit-identical JSON results. Also includes: - json-streaming-formatter.xml: full user guide - toc.xml: cross-referenced in JSON (23.6), HTTP/2 (18.6), and MCP (23.4) sections - springbootdemo-tomcat11 README: streaming formatter section --- .../axis2/json/streaming/FlushingOutputStream.java | 108 ++++++++ .../streaming/JSONStreamingMessageFormatter.java | 271 +++++++++++++++++++++ .../streaming/MoshiStreamingMessageFormatter.java | 237 ++++++++++++++++++ .../userguide/springbootdemo-tomcat11/README.md | 42 ++++ src/site/xdoc/docs/json-streaming-formatter.xml | 170 +++++++++++++ src/site/xdoc/docs/toc.xml | 27 +- 6 files changed, 847 insertions(+), 8 deletions(-) diff --git a/modules/json/src/org/apache/axis2/json/streaming/FlushingOutputStream.java b/modules/json/src/org/apache/axis2/json/streaming/FlushingOutputStream.java new file mode 100644 index 0000000000..b6af390cb1 --- /dev/null +++ b/modules/json/src/org/apache/axis2/json/streaming/FlushingOutputStream.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.json.streaming; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * OutputStream wrapper that flushes to the underlying stream every N bytes. + * + * <p>When wrapping a servlet/transport OutputStream, each flush pushes + * buffered data to the HTTP transport layer as a chunk (HTTP/1.1 chunked + * transfer encoding) or DATA frame (HTTP/2). This prevents reverse proxies + * from rejecting large responses due to body-size limits — the proxy sees + * a stream of small chunks, never the full response body.</p> + * + * <p>Used by {@link JSONStreamingMessageFormatter} to enable transparent + * streaming for any Axis2 JSON service without service code changes.</p> + * + * <p>Default flush interval is 64 KB, chosen to align with typical + * HTTP/2 DATA frame sizes and reverse proxy buffer thresholds.</p> + * + * @since 2.0.1 + */ +public class FlushingOutputStream extends FilterOutputStream { + + /** Default flush interval: 64 KB */ + public static final int DEFAULT_FLUSH_INTERVAL = 64 * 1024; + + private final int flushIntervalBytes; + private long bytesSinceFlush; + + /** + * Wrap an OutputStream with the default flush interval (64 KB). + * + * @param out the underlying output stream + */ + public FlushingOutputStream(OutputStream out) { + this(out, DEFAULT_FLUSH_INTERVAL); + } + + /** + * Wrap an OutputStream with a custom flush interval. + * + * @param out the underlying output stream + * @param flushIntervalBytes flush every N bytes (must be > 0) + */ + public FlushingOutputStream(OutputStream out, int flushIntervalBytes) { + super(out); + if (flushIntervalBytes <= 0) { + throw new IllegalArgumentException( + "flushIntervalBytes must be > 0, got " + flushIntervalBytes); + } + this.flushIntervalBytes = flushIntervalBytes; + this.bytesSinceFlush = 0; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + if (++bytesSinceFlush >= flushIntervalBytes) { + out.flush(); + bytesSinceFlush = 0; + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + bytesSinceFlush += len; + if (bytesSinceFlush >= flushIntervalBytes) { + out.flush(); + bytesSinceFlush = 0; + } + } + + /** + * Returns the configured flush interval in bytes. + */ + public int getFlushIntervalBytes() { + return flushIntervalBytes; + } + + /** + * Returns the number of bytes written since the last flush. + */ + public long getBytesSinceFlush() { + return bytesSinceFlush; + } +} diff --git a/modules/json/src/org/apache/axis2/json/streaming/JSONStreamingMessageFormatter.java b/modules/json/src/org/apache/axis2/json/streaming/JSONStreamingMessageFormatter.java new file mode 100644 index 0000000000..bbfa73390a --- /dev/null +++ b/modules/json/src/org/apache/axis2/json/streaming/JSONStreamingMessageFormatter.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.json.streaming; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonWriter; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.json.factory.JsonConstant; +import org.apache.axis2.json.gson.GsonXMLStreamWriter; +import org.apache.axis2.json.gson.JsonHtmlEncoder; +import org.apache.axis2.kernel.MessageFormatter; +import org.apache.axis2.wsdl.WSDLConstants; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ws.commons.schema.XmlSchema; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Type; +import java.net.URL; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Streaming JSON message formatter for Axis2. + * + * <p>Wraps the transport {@link OutputStream} with a + * {@link FlushingOutputStream} that pushes data to the HTTP transport + * layer every N bytes (default 64 KB). This converts a single buffered + * HTTP response into a stream of small chunks — either HTTP/1.1 chunked + * transfer encoding or HTTP/2 DATA frames — preventing reverse proxy + * body-size rejections on large responses.</p> + * + * <h3>Usage</h3> + * + * <p>Drop-in replacement for {@code JsonFormatter}. Enable per-service + * in {@code services.xml}:</p> + * + * <pre>{@code + * <parameter name="messageFormatters"> + * <messageFormatter contentType="application/json" + * class="org.apache.axis2.json.streaming.JSONStreamingMessageFormatter"/> + * </parameter> + * }</pre> + * + * <p>Or globally in {@code axis2.xml}:</p> + * + * <pre>{@code + * <messageFormatter contentType="application/json" + * class="org.apache.axis2.json.streaming.JSONStreamingMessageFormatter"/> + * }</pre> + * + * <h3>Configuration</h3> + * + * <p>The flush interval can be tuned per-service via a parameter:</p> + * + * <pre>{@code + * <parameter name="streamingFlushIntervalBytes">131072</parameter> + * }</pre> + * + * <p>Default is {@value FlushingOutputStream#DEFAULT_FLUSH_INTERVAL} bytes + * (64 KB). Smaller values increase flush frequency (lower latency to first + * byte, more HTTP frames); larger values reduce flush overhead at the cost + * of larger transport buffers.</p> + * + * <h3>Design</h3> + * + * <p>This formatter does not require service code changes. It wraps the + * OutputStream before serialization begins — GSON writes to the + * {@link JsonWriter} → {@link OutputStreamWriter} → {@link FlushingOutputStream} + * → transport. The service returns its response object as usual; the + * periodic flushing happens transparently during GSON serialization.</p> + * + * <p>For the Axis2/C equivalent, the same pattern applies using + * {@code ap_rflush()} on the Apache httpd response during + * {@code json_object_to_json_string_ext()} output.</p> + * + * @see FlushingOutputStream + * @since 2.0.1 + */ +public class JSONStreamingMessageFormatter implements MessageFormatter { + + private static final Log log = LogFactory.getLog(JSONStreamingMessageFormatter.class); + + /** services.xml parameter name for flush interval override */ + private static final String PARAM_FLUSH_INTERVAL = "streamingFlushIntervalBytes"; + + public void writeTo(MessageContext outMsgCtxt, OMOutputFormat omOutputFormat, + OutputStream outputStream, boolean preserve) throws AxisFault { + + String charSetEncoding = (String) outMsgCtxt.getProperty( + Constants.Configuration.CHARACTER_SET_ENCODING); + if (charSetEncoding == null) { + charSetEncoding = "UTF-8"; + } + + // Wrap the transport OutputStream with periodic flushing. + // This is the only difference from the standard JsonFormatter — + // everything else delegates to the same GSON serialization path. + int flushInterval = getFlushInterval(outMsgCtxt); + OutputStream flushingStream = new FlushingOutputStream(outputStream, flushInterval); + + if (log.isDebugEnabled()) { + log.debug("JSONStreamingMessageFormatter: using FlushingOutputStream with " + + flushInterval + " byte flush interval"); + } + + try (JsonWriter jsonWriter = new JsonWriter( + new OutputStreamWriter(flushingStream, charSetEncoding))) { + + Object retObj = outMsgCtxt.getProperty(JsonConstant.RETURN_OBJECT); + + if (outMsgCtxt.isProcessingFault()) { + writeFaultResponse(outMsgCtxt, jsonWriter); + + } else if (retObj == null) { + writeElementResponse(outMsgCtxt, jsonWriter, preserve); + + } else { + writeObjectResponse(outMsgCtxt, jsonWriter, retObj); + } + + jsonWriter.flush(); + log.debug("JSONStreamingMessageFormatter.writeTo() completed"); + + } catch (IOException e) { + String msg = "Error during JSON streaming serialization"; + log.error(msg, e); + throw AxisFault.makeFault(e); + } + } + + /** + * Write a SOAP fault as JSON. + */ + private void writeFaultResponse(MessageContext outMsgCtxt, JsonWriter jsonWriter) + throws AxisFault { + OMElement element = outMsgCtxt.getEnvelope().getBody().getFirstElement(); + try { + jsonWriter.beginObject(); + jsonWriter.name(element.getLocalName()); + jsonWriter.beginObject(); + Iterator childrenIterator = element.getChildElements(); + while (childrenIterator.hasNext()) { + Object next = childrenIterator.next(); + OMElement omElement = (OMElement) next; + jsonWriter.name(omElement.getLocalName()); + jsonWriter.value(omElement.getText()); + } + jsonWriter.endObject(); + jsonWriter.endObject(); + } catch (IOException e) { + throw new AxisFault("Error writing fault response in JSONStreamingMessageFormatter", e); + } + } + + /** + * Write an OM element response (no return object — schema-driven serialization). + */ + private void writeElementResponse(MessageContext outMsgCtxt, JsonWriter jsonWriter, + boolean preserve) throws AxisFault { + OMElement element = outMsgCtxt.getEnvelope().getBody().getFirstElement(); + QName elementQname = outMsgCtxt.getAxisOperation().getMessage( + WSDLConstants.MESSAGE_LABEL_OUT_VALUE).getElementQName(); + + ArrayList<XmlSchema> schemas = outMsgCtxt.getAxisService().getSchema(); + GsonXMLStreamWriter xmlsw = new GsonXMLStreamWriter(jsonWriter, + elementQname, schemas, outMsgCtxt.getConfigurationContext()); + try { + xmlsw.writeStartDocument(); + element.serialize(xmlsw, preserve); + xmlsw.writeEndDocument(); + } catch (XMLStreamException e) { + throw new AxisFault("Error writing element response in JSONStreamingMessageFormatter", e); + } + } + + /** + * Write a return-object response using GSON. + * + * <p>GSON serializes the object graph field-by-field into the JsonWriter. + * Because the JsonWriter is backed by a {@link FlushingOutputStream}, + * the HTTP transport receives chunks as serialization progresses — + * the full response is never buffered in a single String or byte[].</p> + */ + private void writeObjectResponse(MessageContext outMsgCtxt, JsonWriter jsonWriter, + Object retObj) throws AxisFault { + try { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapter(String.class, new JsonHtmlEncoder()); + Gson gson = gsonBuilder.create(); + + jsonWriter.beginObject(); + jsonWriter.name(JsonConstant.RESPONSE); + Type returnType = (Type) outMsgCtxt.getProperty(JsonConstant.RETURN_TYPE); + gson.toJson(retObj, returnType, jsonWriter); + jsonWriter.endObject(); + + } catch (IOException e) { + String msg = "Error writing object response in JSONStreamingMessageFormatter"; + log.error(msg, e); + throw AxisFault.makeFault(e); + } + } + + /** + * Read the flush interval from the service's configuration. + * Falls back to {@link FlushingOutputStream#DEFAULT_FLUSH_INTERVAL}. + */ + private int getFlushInterval(MessageContext msgCtxt) { + AxisService service = msgCtxt.getAxisService(); + if (service != null) { + Parameter param = service.getParameter(PARAM_FLUSH_INTERVAL); + if (param != null) { + try { + int interval = Integer.parseInt(param.getValue().toString().trim()); + if (interval > 0) { + return interval; + } + } catch (NumberFormatException e) { + log.warn("Invalid " + PARAM_FLUSH_INTERVAL + " value: " + + param.getValue() + "; using default"); + } + } + } + return FlushingOutputStream.DEFAULT_FLUSH_INTERVAL; + } + + public String getContentType(MessageContext outMsgCtxt, OMOutputFormat omOutputFormat, + String soapAction) { + return (String) outMsgCtxt.getProperty(Constants.Configuration.CONTENT_TYPE); + } + + public URL getTargetAddress(MessageContext messageContext, OMOutputFormat omOutputFormat, + URL url) throws AxisFault { + return null; + } + + public String formatSOAPAction(MessageContext messageContext, OMOutputFormat omOutputFormat, + String soapAction) { + return null; + } +} diff --git a/modules/json/src/org/apache/axis2/json/streaming/MoshiStreamingMessageFormatter.java b/modules/json/src/org/apache/axis2/json/streaming/MoshiStreamingMessageFormatter.java new file mode 100644 index 0000000000..2a99e31c20 --- /dev/null +++ b/modules/json/src/org/apache/axis2/json/streaming/MoshiStreamingMessageFormatter.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.json.streaming; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.JsonWriter; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.adapters.Rfc3339DateJsonAdapter; +import okio.BufferedSink; +import okio.Okio; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.json.factory.JsonConstant; +import org.apache.axis2.json.moshi.JsonHtmlEncoder; +import org.apache.axis2.json.moshi.MoshiXMLStreamWriter; +import org.apache.axis2.kernel.MessageFormatter; +import org.apache.axis2.wsdl.WSDLConstants; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ws.commons.schema.XmlSchema; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Type; +import java.net.URL; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; + +/** + * Streaming Moshi JSON message formatter for Axis2. + * + * <p>Drop-in replacement for {@link org.apache.axis2.json.moshi.JsonFormatter} + * that wraps the transport {@link OutputStream} with a + * {@link FlushingOutputStream}. This pushes data to the HTTP transport + * layer every N bytes (default 64 KB), converting a single buffered + * response into a stream of HTTP/2 DATA frames or HTTP/1.1 chunks.</p> + * + * <h3>Usage</h3> + * + * <p>In {@code axis2.xml} (global) or {@code services.xml} (per-service):</p> + * + * <pre>{@code + * <messageFormatter contentType="application/json" + * class="org.apache.axis2.json.streaming.MoshiStreamingMessageFormatter"/> + * }</pre> + * + * <p>Optional flush interval tuning:</p> + * + * <pre>{@code + * <parameter name="streamingFlushIntervalBytes">131072</parameter> + * }</pre> + * + * @see FlushingOutputStream + * @see org.apache.axis2.json.streaming.JSONStreamingMessageFormatter + * @since 2.0.1 + */ +public class MoshiStreamingMessageFormatter implements MessageFormatter { + + private static final Log log = LogFactory.getLog(MoshiStreamingMessageFormatter.class); + + /** services.xml parameter name for flush interval override */ + private static final String PARAM_FLUSH_INTERVAL = "streamingFlushIntervalBytes"; + + public void writeTo(MessageContext outMsgCtxt, OMOutputFormat omOutputFormat, + OutputStream outputStream, boolean preserve) throws AxisFault { + + // Wrap the transport OutputStream with periodic flushing. + int flushInterval = getFlushInterval(outMsgCtxt); + OutputStream flushingStream = new FlushingOutputStream(outputStream, flushInterval); + + if (log.isDebugEnabled()) { + log.debug("MoshiStreamingMessageFormatter: using FlushingOutputStream with " + + flushInterval + " byte flush interval"); + } + + Moshi moshi = new Moshi.Builder() + .add(String.class, new JsonHtmlEncoder()) + .add(Date.class, new Rfc3339DateJsonAdapter()) + .build(); + JsonAdapter<Object> adapter = moshi.adapter(Object.class); + + try (BufferedSink sink = Okio.buffer(Okio.sink(flushingStream)); + JsonWriter jsonWriter = JsonWriter.of(sink)) { + + Object retObj = outMsgCtxt.getProperty(JsonConstant.RETURN_OBJECT); + + if (outMsgCtxt.isProcessingFault()) { + writeFaultResponse(outMsgCtxt, jsonWriter); + + } else if (retObj == null) { + writeElementResponse(outMsgCtxt, jsonWriter, preserve); + + } else { + writeObjectResponse(jsonWriter, adapter, retObj, outMsgCtxt); + } + + jsonWriter.flush(); + log.debug("MoshiStreamingMessageFormatter.writeTo() completed"); + + } catch (IOException e) { + String msg = "Error in MoshiStreamingMessageFormatter"; + log.error(msg, e); + throw AxisFault.makeFault(e); + } + } + + /** + * Write a SOAP fault as JSON. + */ + private void writeFaultResponse(MessageContext outMsgCtxt, JsonWriter jsonWriter) + throws AxisFault { + OMElement element = outMsgCtxt.getEnvelope().getBody().getFirstElement(); + try { + jsonWriter.beginObject(); + jsonWriter.name(element.getLocalName()); + jsonWriter.beginObject(); + Iterator childrenIterator = element.getChildElements(); + while (childrenIterator.hasNext()) { + Object next = childrenIterator.next(); + OMElement omElement = (OMElement) next; + jsonWriter.name(omElement.getLocalName()); + jsonWriter.value(omElement.getText()); + } + jsonWriter.endObject(); + jsonWriter.endObject(); + } catch (IOException e) { + throw new AxisFault("Error writing fault response in MoshiStreamingMessageFormatter", e); + } + } + + /** + * Write an OM element response (schema-driven serialization). + */ + private void writeElementResponse(MessageContext outMsgCtxt, JsonWriter jsonWriter, + boolean preserve) throws AxisFault { + OMElement element = outMsgCtxt.getEnvelope().getBody().getFirstElement(); + QName elementQname = outMsgCtxt.getAxisOperation().getMessage( + WSDLConstants.MESSAGE_LABEL_OUT_VALUE).getElementQName(); + + ArrayList<XmlSchema> schemas = outMsgCtxt.getAxisService().getSchema(); + MoshiXMLStreamWriter xmlsw = new MoshiXMLStreamWriter(jsonWriter, + elementQname, schemas, outMsgCtxt.getConfigurationContext()); + try { + xmlsw.writeStartDocument(); + element.serialize(xmlsw, preserve); + xmlsw.writeEndDocument(); + } catch (XMLStreamException e) { + throw new AxisFault("Error writing element response in MoshiStreamingMessageFormatter", e); + } + } + + /** + * Write a return-object response using Moshi. + * + * <p>Moshi serializes the object graph field-by-field into the JsonWriter. + * The JsonWriter is backed by an Okio sink wrapping a + * {@link FlushingOutputStream}, so the HTTP transport receives chunks + * as serialization progresses.</p> + */ + private void writeObjectResponse(JsonWriter jsonWriter, JsonAdapter<Object> adapter, + Object retObj, MessageContext outMsgCtxt) throws AxisFault { + try { + jsonWriter.beginObject(); + jsonWriter.name(JsonConstant.RESPONSE); + adapter.toJson(jsonWriter, retObj); + jsonWriter.endObject(); + + } catch (IOException e) { + String msg = "Error writing object response in MoshiStreamingMessageFormatter"; + log.error(msg, e); + throw AxisFault.makeFault(e); + } + } + + /** + * Read the flush interval from the service's configuration. + */ + private int getFlushInterval(MessageContext msgCtxt) { + AxisService service = msgCtxt.getAxisService(); + if (service != null) { + Parameter param = service.getParameter(PARAM_FLUSH_INTERVAL); + if (param != null) { + try { + int interval = Integer.parseInt(param.getValue().toString().trim()); + if (interval > 0) { + return interval; + } + } catch (NumberFormatException e) { + log.warn("Invalid " + PARAM_FLUSH_INTERVAL + " value: " + + param.getValue() + "; using default"); + } + } + } + return FlushingOutputStream.DEFAULT_FLUSH_INTERVAL; + } + + public String getContentType(MessageContext outMsgCtxt, OMOutputFormat omOutputFormat, + String soapAction) { + return (String) outMsgCtxt.getProperty(Constants.Configuration.CONTENT_TYPE); + } + + public URL getTargetAddress(MessageContext messageContext, OMOutputFormat omOutputFormat, + URL url) throws AxisFault { + return null; + } + + public String formatSOAPAction(MessageContext messageContext, OMOutputFormat omOutputFormat, + String soapAction) { + return null; + } +} diff --git a/modules/samples/userguide/src/userguide/springbootdemo-tomcat11/README.md b/modules/samples/userguide/src/userguide/springbootdemo-tomcat11/README.md index 3aa9b68bd0..fb76aff6f8 100644 --- a/modules/samples/userguide/src/userguide/springbootdemo-tomcat11/README.md +++ b/modules/samples/userguide/src/userguide/springbootdemo-tomcat11/README.md @@ -163,6 +163,48 @@ $CURL_MTLS -X POST https://localhost:8443/axis2-json-api/services/FinancialBench --- +## Streaming JSON message formatter + +Axis2 2.0.1 includes streaming message formatters that prevent reverse proxy +body-size rejections on large HTTP responses. Drop-in replacement — no service +code changes required. + +**Problem solved (response-side):** The default formatter buffers the entire +JSON response before writing to the wire. A reverse proxy may return 502 Bad +Gateway on large responses. The streaming formatter flushes every 64 KB +(configurable), so the proxy sees a stream of chunks, never the full body. + +**Not solved (request-side):** Large HTTP request bodies (client → server) +are a client-side problem. If a client sends a 620 MB POST and the proxy +rejects it, the fix is client-side: break the request into smaller payloads +or add a pre-send size guard. + +| Variant | Class | +|---------|-------| +| GSON | `org.apache.axis2.json.streaming.JSONStreamingMessageFormatter` | +| Moshi | `org.apache.axis2.json.streaming.MoshiStreamingMessageFormatter` | + +Enable globally in `axis2.xml`: + +```xml +<messageFormatter contentType="application/json" + class="org.apache.axis2.json.streaming.MoshiStreamingMessageFormatter"/> +``` + +Optional flush interval tuning per-service in `services.xml`: + +```xml +<parameter name="streamingFlushIntervalBytes">131072</parameter> +``` + +Applies to all services (BigDataH2Service, FinancialBenchmarkService, any +custom service). Tested on WildFly 32 locally and behind a real reverse proxy +(stg-rapi02, HTTP/2 ALPN). Bit-identical results to the non-streaming formatter. + +See the [Streaming JSON Message Formatter guide](../../../../../../src/site/xdoc/docs/json-streaming-formatter.xml) for full documentation. + +--- + ## Axis2 JSON-RPC request format The top-level key is the **operation name**, and the value is an array containing one object diff --git a/src/site/xdoc/docs/json-streaming-formatter.xml b/src/site/xdoc/docs/json-streaming-formatter.xml new file mode 100644 index 0000000000..0d0565a858 --- /dev/null +++ b/src/site/xdoc/docs/json-streaming-formatter.xml @@ -0,0 +1,170 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<document xmlns="http://maven.apache.org/XDOC/2.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd"> + <properties> + <title>Streaming JSON Message Formatter</title> + </properties> + + <body> + <h1>Streaming JSON Message Formatter</h1> + + <section name="Overview"> + <p>Axis2 2.0.1 includes streaming message formatters for JSON responses. + These formatters wrap the transport OutputStream with a FlushingOutputStream + that pushes data to the HTTP layer every 64 KB (configurable), converting + a single buffered response into a stream of HTTP/2 DATA frames or HTTP/1.1 + chunked transfer encoding segments.</p> + + <p>Both GSON and Moshi variants are provided as drop-in replacements for + their respective base formatters. No service code changes are required.</p> + </section> + + <section name="Problem: Large HTTP Responses and Reverse Proxies"> + <p>When an Axis2 service returns a large JSON response (hundreds of MB), + the default formatter serializes the entire response into memory before + writing it to the wire. A reverse proxy (nginx, AWS ALB, or similar) may + reject the response due to body-size limits or buffering timeouts, + returning <code>502 Bad Gateway</code> to the client even though the + Axis2 service completed successfully.</p> + + <p>The streaming formatter eliminates this by flushing incrementally + during GSON/Moshi serialization. The proxy never sees the full response + body as a single buffer; it forwards chunks as they arrive.</p> + + <p><strong>What this does NOT solve:</strong> Large HTTP <em>request</em> + bodies (client to server). If a client sends a very large POST body and + the proxy rejects it, the fix is client-side: break the request into + smaller payloads (for example, date-range chunking) or add a pre-send + size guard. The streaming formatter operates on the response path only.</p> + </section> + + <section name="Available Variants"> + <table> + <tr><th>JSON Library</th><th>Formatter Class</th><th>Replaces</th></tr> + <tr> + <td>GSON</td> + <td><code>org.apache.axis2.json.streaming.JSONStreamingMessageFormatter</code></td> + <td><code>org.apache.axis2.json.gson.JsonFormatter</code></td> + </tr> + <tr> + <td>Moshi</td> + <td><code>org.apache.axis2.json.streaming.MoshiStreamingMessageFormatter</code></td> + <td><code>org.apache.axis2.json.moshi.JsonFormatter</code></td> + </tr> + </table> + + <p>Both variants share the same <code>FlushingOutputStream</code> + implementation in the <code>org.apache.axis2.json.streaming</code> + package.</p> + </section> + + <section name="Configuration"> + + <subsection name="Global (axis2.xml)"> + <p>Replace the default JSON message formatter with the streaming variant:</p> +<source><![CDATA[<!-- Moshi streaming variant --> +<messageFormatter contentType="application/json" + class="org.apache.axis2.json.streaming.MoshiStreamingMessageFormatter"/> + +<!-- OR: GSON streaming variant --> +<messageFormatter contentType="application/json" + class="org.apache.axis2.json.streaming.JSONStreamingMessageFormatter"/>]]></source> + <p>All JSON-RPC services in the deployment will use the streaming + formatter. Existing services require no code changes.</p> + </subsection> + + <subsection name="Flush Interval Tuning (services.xml)"> + <p>The default flush interval is 64 KB. Override per-service:</p> +<source><![CDATA[<parameter name="streamingFlushIntervalBytes">131072</parameter>]]></source> + <p>Smaller values increase flush frequency (lower latency to first + byte, more HTTP frames). Larger values reduce flush overhead at the + cost of larger transport buffers before each flush.</p> + </subsection> + + </section> + + <section name="How It Works"> + <p>The streaming formatter is structurally identical to the standard + GSON/Moshi formatter with one difference: before creating the + <code>JsonWriter</code>, it wraps the transport <code>OutputStream</code> + with a <code>FlushingOutputStream</code>:</p> + +<source><![CDATA[// Inside writeTo(): +OutputStream flushingStream = new FlushingOutputStream(outputStream, flushInterval); +// ... GSON/Moshi serialization proceeds normally against the flushing stream]]></source> + + <p>During serialization, every time the accumulated bytes exceed the + flush interval, the <code>FlushingOutputStream</code> calls + <code>flush()</code> on the underlying transport stream. This triggers + the servlet container (Tomcat, WildFly/Undertow) to send the buffered + bytes as an HTTP/2 DATA frame or HTTP/1.1 chunk. The reverse proxy + receives and forwards each chunk independently.</p> + + <p>The three serialization paths (fault response, element response, + and object response) all benefit from flushing without any path-specific + changes.</p> + </section> + + <section name="Services That Benefit"> + <p>The streaming formatter applies to <strong>all</strong> Axis2 + JSON-RPC services in the deployment. Any service that returns a + large JSON response benefits transparently:</p> + + <ul> + <li><strong>BigDataH2Service</strong> — enterprise big data + processing with large record sets. The streaming formatter + prevents proxy rejections as response sizes grow into the + hundreds of MB range.</li> + <li><strong>FinancialBenchmarkService</strong> — portfolio + variance, Monte Carlo VaR, and scenario analysis. Responses + are typically small (1-10 KB) but the formatter operates + transparently with no overhead on small payloads.</li> + <li><strong>Any custom service</strong> — services deployed + as <code>.aar</code> archives benefit without code changes + once the formatter is configured in <code>axis2.xml</code>.</li> + </ul> + </section> + + <section name="Testing"> + <p>The streaming formatter has been tested on:</p> + <ul> + <li>WildFly 32 (local) — all services produce valid JSON</li> + <li>WildFly 32 behind a reverse proxy (staging infrastructure + with HTTP/2 ALPN on port 8443) — all services produce + bit-identical results compared to the non-streaming formatter</li> + </ul> + <p>To verify the formatter is active, enable DEBUG logging for + <code>org.apache.axis2.json.streaming</code> and look for:</p> +<source>MoshiStreamingMessageFormatter: using FlushingOutputStream with 65536 byte flush interval</source> + </section> + + <section name="Axis2/C Equivalent"> + <p>The same pattern applies to Axis2/C services using Apache httpd + with mod_axis2. During JSON response generation, call + <code>ap_rflush(r)</code> periodically to flush the response + bucket brigade. This causes mod_h2 to emit HTTP/2 DATA frames + incrementally, achieving the same proxy-friendly streaming behavior + as the Java formatter.</p> + </section> + + </body> +</document> diff --git a/src/site/xdoc/docs/toc.xml b/src/site/xdoc/docs/toc.xml index 878dafca36..754690cbe1 100644 --- a/src/site/xdoc/docs/toc.xml +++ b/src/site/xdoc/docs/toc.xml @@ -125,14 +125,15 @@ Transport</a></li> Integration Guide</a></li> <li>18.5 <a href="wildfly-http2-integration-guide.html">WildFly HTTP/2 Integration Guide</a></li> -<li>18.6 <a href="servlet-transport.html">HTTP +<li>18.6 <a href="json-streaming-formatter.html">Streaming JSON Formatter for Large HTTP/2 Responses</a></li> +<li>18.7 <a href="servlet-transport.html">HTTP servlet transport</a></li> -<li>18.7 <a href="jms-transport.html">JMS Transport</a></li> -<li>18.8 <a href="tcp-transport.html">TCP Transport</a></li> -<li>18.9 <a href="mail-transport.html">Mail Transport</a></li> -<li>18.10 <a href="udp-transport.html">UDP Transport</a></li> -<li>18.11 <a href="xmpp-transport.html">XMPP Transport</a></li> -<li>18.12 <a href="transport_howto.html">Custom +<li>18.8 <a href="jms-transport.html">JMS Transport</a></li> +<li>18.9 <a href="tcp-transport.html">TCP Transport</a></li> +<li>18.10 <a href="mail-transport.html">Mail Transport</a></li> +<li>18.11 <a href="udp-transport.html">UDP Transport</a></li> +<li>18.12 <a href="xmpp-transport.html">XMPP Transport</a></li> +<li>18.13 <a href="transport_howto.html">Custom Transport</a></li> </ul> </li> @@ -168,6 +169,7 @@ Support</a></li> <li><a href="json-rpc-mcp-guide.html#error_handling">Error Handling: Correlation ID Pattern</a></li> <li><a href="json-rpc-mcp-guide.html#tested_features">Unit Test Feature Coverage</a></li> <li><a href="json-rpc-mcp-guide.html#not_implemented">Not Implemented / Limitations</a></li> + <li><a href="json-streaming-formatter.html">Streaming Formatter for Large MCP Tool Responses</a></li> <li><a href="json-rpc-mcp-guide.html#data_api_relationship">Relationship to Data API Vision</a></li> <li><a href="json-rpc-mcp-guide.html#python_compat">Python MCP Compatibility Notes</a></li> </ul> @@ -181,7 +183,16 @@ Support</a></li> <li><a href="mcp-architecture.html#key-design-decisions">Key Design Decisions</a></li> </ul> </li> - <li><strong>23.6 <a href="mcp-examples.html">MCP Examples — Financial Services Benchmarks</a></strong> + <li><strong>23.6 <a href="json-streaming-formatter.html">Streaming JSON Message Formatter</a></strong> + <ul> + <li><a href="json-streaming-formatter.html#Overview">Overview</a></li> + <li><a href="json-streaming-formatter.html#Problem_Large_HTTP_Responses_and_Reverse_Proxies">Problem: Large Responses and Reverse Proxies</a></li> + <li><a href="json-streaming-formatter.html#Configuration">Configuration (axis2.xml / services.xml)</a></li> + <li><a href="json-streaming-formatter.html#Services_That_Benefit">Services That Benefit (BigDataH2Service, FinancialBenchmarkService)</a></li> + <li><a href="json-streaming-formatter.html#Axis2_C_Equivalent">Axis2/C Equivalent (ap_rflush)</a></li> + </ul> + </li> + <li><strong>23.7 <a href="mcp-examples.html">MCP Examples — Financial Services Benchmarks</a></strong> <ul> <li><a href="mcp-examples.html#live-examples-tested-on-wildfly-32-0-1-final-2026-04-08">Live Examples (WildFly 32)</a></li> <li><a href="mcp-examples.html#demo-1-stress-test--what-if-correlations-spike">Demo 1: Stress Test</a></li>
