Author: markt Date: Tue Mar 31 17:07:43 2015 New Revision: 1670394 URL: http://svn.apache.org/r1670394 Log: Revert r1663324. The fix for the errors running the Autobahn test suite with perMessageDeflate was incorrect. The root cause was in the new handling for blocking writes r1663324 fixed blocking writes but broke non-blocking writes.
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1670394&r1=1670393&r2=1670394&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Tue Mar 31 17:07:43 2015 @@ -316,13 +316,16 @@ public class PerMessageDeflate implement List<MessagePart> allCompressedParts = new ArrayList<>(); for (MessagePart uncompressedPart : uncompressedParts) { - if (Util.isControl(uncompressedPart.getOpCode())) { + byte opCode = uncompressedPart.getOpCode(); + if (Util.isControl(opCode)) { // Control messages can appear in the middle of other messages // and must not be compressed. Pass it straight through allCompressedParts.add(uncompressedPart); } else { List<MessagePart> compressedParts = new ArrayList<>(); ByteBuffer uncompressedPayload = uncompressedPart.getPayload(); + SendHandler uncompressedIntermediateHandler = + uncompressedPart.getIntermediateHandler(); deflater.setInput(uncompressedPayload.array(), uncompressedPayload.arrayOffset() + uncompressedPayload.position(), @@ -339,8 +342,7 @@ public class PerMessageDeflate implement compressedPayload.remaining(), flush); compressedPayload.position(compressedPayload.position() + written); - if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && - deflater.needsInput()) { + if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && deflater.needsInput()) { // This message part has been fully processed by the // deflater. Fire the send handler for this message part // and move on to the next message part. @@ -360,24 +362,28 @@ public class PerMessageDeflate implement boolean fin = uncompressedPart.isFin(); boolean full = compressedPayload.limit() == compressedPayload.capacity(); boolean needsInput = deflater.needsInput(); + long blockingWriteTimeoutExpiry = uncompressedPart.getBlockingWriteTimeoutExpiry(); if (fin && !full && needsInput) { // End of compressed message. Drop EOM bytes and output. compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length); - compressedPart = createNewCompressedMessagePart( - uncompressedPart, true, compressedPayload); + compressedPart = new MessagePart(true, getRsv(uncompressedPart), + opCode, compressedPayload, uncompressedIntermediateHandler, + uncompressedIntermediateHandler, blockingWriteTimeoutExpiry); deflateRequired = false; startNewMessage(); } else if (full && !needsInput) { // Write buffer full and input message not fully read. // Output and start new compressed part. - compressedPart = createNewCompressedMessagePart( - uncompressedPart, false, compressedPayload); + compressedPart = new MessagePart(false, getRsv(uncompressedPart), + opCode, compressedPayload, uncompressedIntermediateHandler, + uncompressedIntermediateHandler, blockingWriteTimeoutExpiry); } else if (!fin && full && needsInput) { // Write buffer full and input message not fully read. // Output and get more data. - compressedPart = createNewCompressedMessagePart( - uncompressedPart, false, compressedPayload); + compressedPart = new MessagePart(false, getRsv(uncompressedPart), + opCode, compressedPayload, uncompressedIntermediateHandler, + uncompressedIntermediateHandler, blockingWriteTimeoutExpiry); deflateRequired = false; } else if (fin && full && needsInput) { // Write buffer full. Input fully read. Deflater may be @@ -387,22 +393,24 @@ public class PerMessageDeflate implement // - in middle of EOM bytes // - about to write EOM bytes // - more data to write - int eomBufferWritten = deflater.deflate( - EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH); + int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH); if (eomBufferWritten < EOM_BUFFER.length) { // EOM has just been completed - compressedPayload.limit(compressedPayload.limit() - - EOM_BYTES.length + eomBufferWritten); - compressedPart = createNewCompressedMessagePart( - uncompressedPart, true, compressedPayload); + compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten); + compressedPart = new MessagePart(true, + getRsv(uncompressedPart), opCode, compressedPayload, + uncompressedIntermediateHandler, uncompressedIntermediateHandler, + blockingWriteTimeoutExpiry); deflateRequired = false; startNewMessage(); } else { // More data to write // Copy bytes to new write buffer writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten); - compressedPart = createNewCompressedMessagePart( - uncompressedPart, false, compressedPayload); + compressedPart = new MessagePart(false, + getRsv(uncompressedPart), opCode, compressedPayload, + uncompressedIntermediateHandler, uncompressedIntermediateHandler, + blockingWriteTimeoutExpiry); } } else { throw new IllegalStateException("Should never happen"); @@ -439,20 +447,12 @@ public class PerMessageDeflate implement } - private MessagePart createNewCompressedMessagePart(MessagePart uncompressedMessagePart, - boolean fin, ByteBuffer compressedPayload) { - int rsv = uncompressedMessagePart.getRsv(); - byte opCode = uncompressedMessagePart.getOpCode(); + private int getRsv(MessagePart uncompressedMessagePart) { + int result = uncompressedMessagePart.getRsv(); if (!firstCompressedFrameWritten) { - rsv += RSV_BITMASK; + result += RSV_BITMASK; firstCompressedFrameWritten = true; - } else { - // This must be a continuation frame - opCode = 0; } - return new MessagePart(fin, rsv, opCode, compressedPayload, - uncompressedMessagePart.getIntermediateHandler(), - uncompressedMessagePart.getIntermediateHandler(), - uncompressedMessagePart.getBlockingWriteTimeoutExpiry()); + return result; } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org