[
https://issues.apache.org/jira/browse/CXF-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
avidd updated CXF-6776:
-----------------------
Attachment: CXF-6776-example.zip
Example maven project producing several errors sporadically.
> MTOM client receives bogus
> --------------------------
>
> Key: CXF-6776
> URL: https://issues.apache.org/jira/browse/CXF-6776
> Project: CXF
> Issue Type: Bug
> Affects Versions: 3.1.1, 3.1.5
> Environment: Windows 7 Professional 64 bit, java 1.8.0_45
> Reporter: avidd
> Attachments: CXF-6776-example.zip
>
>
> We have a SOAP web service that transmits large structured results sets (a
> list of rows each of which is a List<String>) using MTOM. We have now
> migrated to CXF due to some other bug in jaxws-spring. This (part of the)
> code seemed to work fine in jaxws-spring but we had strange problems there
> which we debugged to the lower layers as well. Now we have the situation that
> some single bytes as received by the client are not the bytes that were sent
> by the server. This can show as a byte having a completely wrong value or as
> apparently a byte added *and* the byte beside having the wrong value.
> There are test cases that return result sets of several thousand rows each of
> which has several columns in it. The result set is "streamed" via MTOM. These
> tests fail sporadically every 10th or 20th time.
> This is what I found out until now:
> * The error is reproducible on different machines.
> * I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it
> still occurs. As said before there were similar bugs with jaxws-spring which
> showed different failures.
> * The server cleanly writes all results into the output and flushes the
> output stream before closing it
> * Both, client (a JUnit Test) and server (a Jetty server started by that
> test) are running on the same machine, actually in the same VM.
> * If I enable some of the logging in the below class (QueryResponse), this
> seems to reduce the probability of the error. I was not able to reproduce the
> error with the logging enabled. This is very frustrating and I think it hints
> at some race condition.
> Our streaming result encodes rows like this:
> * 4 bytes stating the number of columns in that row as an int (trailing nulls
> are cut off)
> * for each column: 4 bytes stating the number of bytes in that column's value
> as an int
> * the said number of bytes, will be converted to a string
> *The test*
> * If the test runs fine, the client receives 2077 containing 15 columns each.
> * Each column is a double or a string, it's basically the result of a simple
> SQL query. (Of course, if it actually were SQL we would use JDBC. It's
> actually a SAP data warehouse.)
> * If the test fails, which it only does sporadically, it is always the 14th
> row failing.
> *Example 1 of failure*
> When it is correctly received at the client side, then this is the bytes of
> row 14 of test 2.
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first column has 2 bytes
> * next 2 bytes: "BA"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
> {code}
> This is the erroneous bytes as received on the client side
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first columns has 2 bytes
> * next 2 bytes: "BM"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
> {code}
> At first glance, it looks like a byte was added (because of the three
> consecutive 0s). So I thought it may be an off-by-one error. But when looking
> at the 2's complement I see:
> {code}
> good: 65: 01000001
> bad: 77, 71: 01001101 0100111
> {code}
> I can't see where a byte would have been inserted. It also doesn't look like
> only bits were flipped. It rather looks like total bogus to me.
> *Example 2 of Failure*
> * The bytes sent
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51,
> 0, 0, 0, 2, ...]
> {code}
> * The bytes received
> {code}
> XX
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48,
> 0, 0, 0, 2,...]
> {code}
> Similar but different problems first occurred when we upgraded from Java 7 to
> Java 8 a year ago. Before, everything was fine. At that time we were using
> jaxws-spring and had the situation that sometimes the mime-boundary at the
> end of the attachment was missing. We hoped that an upgrade to CXF would fix
> the problem but now we even have issues with the data.
> To me it looks like something is totally broken in the JRE but this only
> shows up when using MTOM, so it may be some integration problem. I wonder
> whether we are the only ones seeing this behavior.
> This is my "streaming result set" class:
> {code}
> @XmlAccessorType ( XmlAccessType.NONE )
> @XmlRootElement ( name = "streamResponse" )
> @XmlType ( name = "streamResponseType" )
> public class QueryResponse {
> private static final String MIME_TYPE_OCTET_STREAM =
> "application/octet-stream";
> private static final Logger LOG =
> LoggerFactory.getLogger(QueryResponse.class);
> private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
> private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
> private static final int BUFFER_SIZE = 100;
> private static final int PREMATURE_END_OF_RESULT = -1;
> private static final byte[] PREMATURE_END_OF_RESULT_BYTES =
> ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
> private volatile int totalBytesRead = 0;
> private volatile int rowsRead = 0;
> private volatile int rowsWritten = 0;
> private DataHandler results;
> private BlockingQueue<List<String>> resultSet;
> private ResultSetIterator<String> resultSetIterator;
> private boolean receiving = true;
> /** Create a new response. This constructor is used by for unmarshalling
> the response. */
> QueryResponse() { }
>
> /**
> * Create a new result set encapsulating the given components.
> * @param aResults the result iterator
> */
> QueryResponse(AutoCloseableIterator<List<String>> aResults) {
> results = encode(aResults);
> }
>
> @XmlElement ( required = true )
> @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
> DataHandler getResults() {
> return results;
> }
>
> /**
> * Set the result set, called by JAXB.
> * @param aDataHandler the data handler
> */
> void setResults(DataHandler aDataHandler) {
> if ( aDataHandler == null ) { throw new
> NullPointerException("aDataHandler"); }
> if ( resultSet != null ) { throw new IllegalStateException("Result set
> already exists."); }
> results = aDataHandler;
> // Pipelining
> /* parse results and fill queue while loading from the network */
> resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
> resultSetIterator = new ResultSetIterator<String>(resultSet);
> DataHandler dataHandler = results;
> try {
> decode(dataHandler, resultSet, resultSetIterator);
> } catch ( InterruptedException e ) {
> Thread.currentThread().interrupt();
> }
> }
>
> /**
> * Called on the client side to get a streaming and blocking iterator.
> * @return the result set as a blocking iterator
> */
> public Iterator<List<String>> getResultSet() {
> return resultSetIterator;
> }
>
> private int fill(byte[] bytes, InputStream in) throws IOException {
> int off = 0;
> int readCount = 0;
> do {
> // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off
> ) > 0 and not EOF
> readCount = in.read(bytes, off, bytes.length - off);
> off += readCount;
> if ( readCount > 0 ) {
> totalBytesRead += readCount;
> }
> } while ( readCount > 0 && off < bytes.length );
> if ( off > 0 && off < bytes.length ) { // end of stream is a correct
> termination
> try {
> readCount = in.read(bytes, off, bytes.length - off);
> String exception = readException(in);
> throw new RuntimeException(exception);
> } catch ( EOFException e ) {
> // There was no exception written by the server, just a premature end
> of the stream causing the client-side EOF.
> throw new RuntimeException("Premature end of stream, total bytes
> read: " + totalBytesRead);
> }
> }
> return off;
> }
> private static void checkException(int len, InputStream in) throws
> ClassNotFoundException, IOException {
> if ( len == PREMATURE_END_OF_RESULT ) {
> String exception = readException(in);
> throw new RuntimeException(exception);
> }
> }
> private static String readException(InputStream in) throws IOException {
> ObjectInputStream objIn = new ObjectInputStream(in);
> try {
> Object object = objIn.readObject();
> if ( object != null ) {
> return object.toString();
> }
> } catch ( ClassNotFoundException e ) {
> throw new RuntimeException("Could not read exception.", e);
> }
> return "No exception received from service after premature end of result";
> }
> private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
> assert ( aResults != null );
> final PipedOutputStream out = new PipedOutputStream();
> DataHandler dh = new DataHandler(new StreamDataSource(out,
> MIME_TYPE_OCTET_STREAM));
> Encoder encoder = new Encoder(out, aResults, new
> ServerExceptionHandler());
> new Thread(encoder).start();
> return dh;
> }
>
> private void decode(
> DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet,
> ExceptionHandler exceptionHandler)
> throws InterruptedException {
> Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
> new Thread(decoder).start();
> }
>
> private void awaitIteratorBufferNotFull() throws InterruptedException {
> while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
> }
> private void awaitElements() throws InterruptedException {
> while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
> }
> private static final class StreamDataSource implements DataSource {
> private final String name = UUID.randomUUID().toString();
> private final InputStream in;
> private final String mimeType;
> private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
> ArgumentChecks.checkNotNull(aOut, "aOut");
> ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
> try {
> in = new PipedInputStream(aOut);
> } catch ( IOException e ) {
> throw new RuntimeException("Could not create input stream.", e);
> }
> mimeType = aMimeType;
> }
> @Override public String getName() { return name; }
> @Override public String getContentType() { return mimeType; }
> /**
> * {@inheritDoc}
> *
> * This implementation violates the specification in that it is
> destructive. Only the first call will return an
> * appropriate input stream.
> */
> @Override public InputStream getInputStream() { return in; }
> @Override public OutputStream getOutputStream() { throw new
> UnsupportedOperationException(); }
> }
>
> /**
> * Decodes the contents of an input stream as written by the {@link
> com.tn_ag.sap.QueryResponse.Encoder} and writes
> * parsed rows to a {@link java.util.Queue}.
> */
> private class Decoder implements Runnable {
> private final DataHandler dataHandler;
> private final BlockingQueue<List<String>> resultSet;
> private final ExceptionHandler exceptionHandler;
> private final byte[] lenBytes = new byte[4];
> Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet,
> ExceptionHandler aHandler) {
> ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
> ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
> ArgumentChecks.checkNotNull(aHandler, "aHandler");
> dataHandler = aDataHandler;
> resultSet = aResultSet;
> exceptionHandler = aHandler;
> }
>
> @Override
> public void run() {
> InputStream in = null;
> List<String> row;
> int len;
> try {
> in = dataHandler.getInputStream();
>
> while ( receiving ) {
> synchronized ( resultSet ) {
> receiving = fill(lenBytes, in) > 0; // read next row's
> length in number of columns
> len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length
> to integer
> if ( !receiving || len == 0 ) { break; }
> checkException(len, in); // -1 signals an
> exception
> row = readRow(in, len);
> awaitIteratorBufferNotFull();
> resultSet.put(row);
> rowsRead++;
> if ( rowsRead % 1000 == 0 ) {
> LOG.debug("already received {} rows", rowsRead);
> }
> resultSet.notifyAll(); // notify waiting
> consumer threads
> }
> }
> stopReception();
> LOG.debug("received a total of {} rows.", rowsRead);
> } catch ( InterruptedException e ) {
> LOG.info("Result reception interrupted.");
> } catch ( Exception e ) {
> exceptionHandler.handle(e);
> stopReception();
> } finally {
> receiving = false;
> try {
> if ( in != null ) { in.close(); }
> } catch ( IOException e ) {
> exceptionHandler.handle(e);
> }
> }
> }
> private List<String> readRow(InputStream in, int len) throws IOException {
> List<String> row = new ArrayList<>(len); // create list of
> appropriate length
> for ( int col = 0; col < len; col++ ) { // for each column
>
> fill(lenBytes, in); // read the value
> length (fixed for some types if schema known)
> int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the
> length of the value as bytes to an int
> final byte[] bytes = new byte[valLen]; // allocate a buffer
> of exactly the required size
> fill(bytes, in);
> row.add(STRINGS.internalize(new String(bytes)));
> }
> return row;
> }
> private void stopReception() {
> synchronized ( resultSet ) {
> receiving = false; // we will stop parsing now
> resultSet.notifyAll(); // consumers can now consume the
> remaining elements from the queue
> LOG.debug("Read " + rowsRead + " rows from binary stream");
> }
> }
> }
>
> /**
> * Encodes the given result set and writes the result to an output stream.
> */
> private class Encoder implements Runnable {
> private final OutputStream out;
> private final AutoCloseableIterator<List<String>> iterator;
> private final ExceptionHandler handler;
> Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults,
> ExceptionHandler aHandler) {
> ArgumentChecks.checkNotNull(aOut, "aOut");
> ArgumentChecks.checkNotNull(aResults, "aResults");
> ArgumentChecks.checkNotNull(aHandler, "aHandler");
> out = aOut;
> iterator = aResults;
> handler = aHandler;
> }
> @Override
> public void run() {
> try ( AutoCloseableIterator<List<String>> iter = this.iterator;
> OutputStream out = this.out ) {
> writeResultSet(iter, out);
> out.flush();
> } catch ( Exception e ) {
> handler.handle(e);
> }
> }
> private void writeResultSet(AutoCloseableIterator<List<String>> iter,
> OutputStream out2) throws IOException {
> List<String> row = null;
> while ( this.iterator.hasNext() ) {
> try {
> row = this.iterator.next();
> writeRow(row);
> rowsWritten++;
> } catch ( Exception e ) {
> out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an
> exception instead of row size
> writeException(out, e); // send exception to
> client for rethrowing it there
> throw e; // stop transmission,
> handle exception on server-side (logging)
> }
> }
> LOG.info("wrote {} rows to binary stream, closing output stream",
> rowsWritten);
> }
> private void writeRow(List<String> row) throws IOException {
> out.write(ByteBuffer.allocate(4).putInt(row.size()).array()); //
> write row size (in number of columns)
> for ( String s : row ) {
> if ( s == null ) { s = ""; }
> byte[] bytes = s.getBytes();
> out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); //
> write size of column in bytes
> out.write(bytes); //
> write column value
> }
> }
> private void writeException(OutputStream output, Exception e) throws
> IOException {
> ObjectOutputStream objOut = new ObjectOutputStream(output);
> objOut.writeObject(toString(e));
> }
> private String toString(Throwable e) {
> if ( e instanceof UncheckedConnectionException && e.getCause() != null
> ) {
> e = e.getCause();
> }
> ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
> PrintWriter writer = new PrintWriter(stackTraceOut);
> e.printStackTrace(writer);
> writer.flush();
> return new String(stackTraceOut.toByteArray());
> }
> }
>
> private final class ResultSetIterator<T> extends ExceptionHandler
> implements Iterator<List<T>> {
> private final BlockingQueue<List<T>> queue;
> private Exception exception;
> private int returned = 0;
> private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
> queue = aQueue;
> }
> /**
> * {@inheritDoc}
> *
> * This implementation marks the current thread as interrupted if
> streaming could not be commenced.
> */
> @Override
> public boolean hasNext() {
> if ( exception != null ) { throw new
> UncheckedConnectionException("Exception while reading results", exception); }
> try {
> synchronized ( resultSet ) {
> awaitElements();
> if ( exception != null ) {
> throw new UncheckedConnectionException("Exception while reading
> results", exception);
> }
> if ( resultSet.isEmpty() ) {
> LOG.debug("iterator returned " + returned + " rows");
> }
> return !resultSet.isEmpty();
> }
> } catch ( InterruptedException e ) {
> Thread.currentThread().interrupt();
> return false;
> }
>
> }
> /**
> * {@inheritDoc}
> *
> * If the current thread is interrupted during this call, it is marked as
> interrupted and the method returns
> * {@code null}.
> *
> * @throws NoSuchElementException if called while {@link #hasNext()}
> returns {@code false}
> */
> @Override
> public List<T> next() throws NoSuchElementException {
> if ( exception != null ) { throw new IllegalStateException("Exception
> while reading results", exception); }
> if ( !hasNext() ) { throw new NoSuchElementException(); }
> try {
> List<T> result = queue.take();
> synchronized ( resultSet ) { resultSet.notify(); }
> returned++;
> return result;
> } catch ( InterruptedException e ) {
> Thread.currentThread().interrupt();
> return null;
> }
> }
> /**
> * This method is not supported.
> */
> @Override public void remove() { throw new
> UnsupportedOperationException("RTFM"); }
> @Override void handle(Exception aException) {
> if ( exception != null ) {
> LOG.warn("There was another exception.", aException);
> }
> exception = aException;
> }
> }
>
> private abstract static class ExceptionHandler {
> abstract void handle(Exception exception);
> }
>
> private static class ServerExceptionHandler extends ExceptionHandler {
> @Override
> void handle(Exception aException) {
> LOG.error(FATAL, "Exception while writing response.", aException);
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)