http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java new file mode 100644 index 0000000..31abfa8 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.utils.memory.MemoryUtil; + +import com.google.common.base.Function; + +/** + * Base class for DataOutput implementations that does not have an optimized implementations of Plus methods + * and does no buffering. + * <p/> + * Unlike BufferedDataOutputStreamPlus this is capable of operating as an unbuffered output stream. + * Currently necessary because SequentialWriter implements its own buffering along with mark/reset/truncate. + */ +public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlus +{ + protected UnbufferedDataOutputStreamPlus() + { + super(); + } + + protected UnbufferedDataOutputStreamPlus(WritableByteChannel channel) + { + super(channel); + } + + /* + !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile. + */ + + /** + * Writes the entire contents of the byte array <code>buffer</code> to + * this RandomAccessFile starting at the current file pointer. + * + * @param buffer the buffer to be written. + * @throws IOException If an error occurs trying to write to this RandomAccessFile. + */ + public void write(byte[] buffer) throws IOException + { + write(buffer, 0, buffer.length); + } + + /** + * Writes <code>count</code> bytes from the byte array <code>buffer</code> + * starting at <code>offset</code> to this RandomAccessFile starting at + * the current file pointer.. + * + * @param buffer the bytes to be written + * @param offset offset in buffer to get bytes + * @param count number of bytes in buffer to write + * @throws IOException If an error occurs attempting to write to this + * RandomAccessFile. + * @throws IndexOutOfBoundsException If offset or count are outside of bounds. + */ + public abstract void write(byte[] buffer, int offset, int count) throws IOException; + + /** + * Writes the specified byte <code>oneByte</code> to this RandomAccessFile + * starting at the current file pointer. Only the low order byte of + * <code>oneByte</code> is written. + * + * @param oneByte the byte to be written + * @throws IOException If an error occurs attempting to write to this + * RandomAccessFile. + */ + public abstract void write(int oneByte) throws IOException; + + /** + * Writes a boolean to this output stream. + * + * @param val the boolean value to write to the OutputStream + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeBoolean(boolean val) throws IOException + { + write(val ? 1 : 0); + } + + /** + * Writes a 8-bit byte to this output stream. + * + * @param val the byte value to write to the OutputStream + * @throws java.io.IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeByte(int val) throws IOException + { + write(val & 0xFF); + } + + /** + * Writes the low order 8-bit bytes from a String to this output stream. + * + * @param str the String containing the bytes to write to the OutputStream + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeBytes(String str) throws IOException + { + byte bytes[] = new byte[str.length()]; + for (int index = 0; index < str.length(); index++) + { + bytes[index] = (byte) (str.charAt(index) & 0xFF); + } + write(bytes); + } + + /** + * Writes the specified 16-bit character to the OutputStream. Only the lower + * 2 bytes are written with the higher of the 2 bytes written first. This + * represents the Unicode value of val. + * + * @param val the character to be written + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeChar(int val) throws IOException + { + write((val >>> 8) & 0xFF); + write((val >>> 0) & 0xFF); + } + + /** + * Writes the specified 16-bit characters contained in str to the + * OutputStream. Only the lower 2 bytes of each character are written with + * the higher of the 2 bytes written first. This represents the Unicode + * value of each character in str. + * + * @param str the String whose characters are to be written. + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeChars(String str) throws IOException + { + byte newBytes[] = new byte[str.length() * 2]; + for (int index = 0; index < str.length(); index++) + { + int newIndex = index == 0 ? index : index * 2; + newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF); + newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF); + } + write(newBytes); + } + + /** + * Writes a 64-bit double to this output stream. The resulting output is the + * 8 bytes resulting from calling Double.doubleToLongBits(). + * + * @param val the double to be written. + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeDouble(double val) throws IOException + { + writeLong(Double.doubleToLongBits(val)); + } + + /** + * Writes a 32-bit float to this output stream. The resulting output is the + * 4 bytes resulting from calling Float.floatToIntBits(). + * + * @param val the float to be written. + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeFloat(float val) throws IOException + { + writeInt(Float.floatToIntBits(val)); + } + + /** + * Writes a 32-bit int to this output stream. The resulting output is the 4 + * bytes, highest order first, of val. + * + * @param val the int to be written. + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public void writeInt(int val) throws IOException + { + write((val >>> 24) & 0xFF); + write((val >>> 16) & 0xFF); + write((val >>> 8) & 0xFF); + write((val >>> 0) & 0xFF); + } + + /** + * Writes a 64-bit long to this output stream. The resulting output is the 8 + * bytes, highest order first, of val. + * + * @param val the long to be written. + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public void writeLong(long val) throws IOException + { + write((int) (val >>> 56) & 0xFF); + write((int) (val >>> 48) & 0xFF); + write((int) (val >>> 40) & 0xFF); + write((int) (val >>> 32) & 0xFF); + write((int) (val >>> 24) & 0xFF); + write((int) (val >>> 16) & 0xFF); + write((int) (val >>> 8) & 0xFF); + write((int) (val >>> 0) & 0xFF); + } + + /** + * Writes the specified 16-bit short to the OutputStream. Only the lower 2 + * bytes are written with the higher of the 2 bytes written first. + * + * @param val the short to be written + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public void writeShort(int val) throws IOException + { + writeChar(val); + } + + /** + * Writes the specified String out in UTF format to the provided DataOutput + * + * @param str the String to be written in UTF format. + * @param out the DataOutput to write the UTF encoded string to + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public static void writeUTF(String str, DataOutput out) throws IOException + { + int length = str.length(); + int utfCount = calculateUTFLength(str, length); + + if (utfCount > 65535) + throw new UTFDataFormatException(); //$NON-NLS-1$ + + byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2); + + int utfIndex = 2; + utfBytes[0] = (byte) (utfCount >> 8); + utfBytes[1] = (byte) utfCount; + int bufferLength = utfBytes.length; + + if (utfCount == length && utfCount + utfIndex < bufferLength) + { + for (int charIndex = 0 ; charIndex < length ; charIndex++) + utfBytes[utfIndex++] = (byte) str.charAt(charIndex); + } + else + { + int charIndex = 0; + while (charIndex < length) + { + char ch = str.charAt(charIndex); + int sizeOfChar = sizeOfChar(ch); + if (utfIndex + sizeOfChar > bufferLength) + { + out.write(utfBytes, 0, utfIndex); + utfIndex = 0; + } + + switch (sizeOfChar) + { + case 3: + utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12))); + utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6))); + utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch)); + break; + case 2: + utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6))); + utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch)); + break; + case 1: + utfBytes[utfIndex] = (byte) ch; + break; + default: + throw new IllegalStateException(); + } + utfIndex += sizeOfChar; + charIndex++; + } + } + out.write(utfBytes, 0, utfIndex); + } + + /* + * Factored out into separate method to create more flexibility around inlining + */ + private static int calculateUTFLength(String str, int length) + { + int utfCount = 0; + for (int i = 0; i < length; i++) + utfCount += sizeOfChar(str.charAt(i)); + return utfCount; + } + + private static int sizeOfChar(int ch) + { + // wrap 0 around to max, because it requires 3 bytes + return 1 + // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0 + // (by negating it and taking the sign bit) + + (-(ch / 128) >>> 31) + // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around, + // so we only then need to confirm it is greater than 2047 + + (-(((ch - 1) & 0xffff) / 2047) >>> 31); + } + + /** + * Writes the specified String out in UTF format. + * + * @param str the String to be written in UTF format. + * @throws IOException If an error occurs attempting to write to this + * DataOutputStream. + */ + public final void writeUTF(String str) throws IOException + { + writeUTF(str, this); + } + + // ByteBuffer to use for defensive copies + private final ByteBuffer hollowBufferD = MemoryUtil.getHollowDirectByteBuffer(); + + @Override + public void write(ByteBuffer buf) throws IOException + { + if (buf.hasArray()) + { + write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + } + else + { + assert buf.isDirect(); + MemoryUtil.duplicateDirectByteBuffer(buf, hollowBufferD); + while (hollowBufferD.hasRemaining()) + channel.write(hollowBufferD); + } + } + + public void write(Memory memory, long offset, long length) throws IOException + { + for (ByteBuffer buffer : memory.asByteBuffers(offset, length)) + write(buffer); + } + + @Override + public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException + { + return f.apply(channel); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java new file mode 100644 index 0000000..d8c8f0c --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.WritableByteChannel; + +/** + * When possible use {@link WrappedDataOutputStreamPlus} instead of this class, as it will + * be more efficient when using Plus methods. This class is only for situations where it cannot be used. + * + * The channel provided by this class is just a wrapper around the output stream. + */ +public class WrappedDataOutputStreamPlus extends UnbufferedDataOutputStreamPlus +{ + protected final OutputStream out; + public WrappedDataOutputStreamPlus(OutputStream out) + { + super(); + this.out = out; + } + + public WrappedDataOutputStreamPlus(OutputStream out, WritableByteChannel channel) + { + super(channel); + this.out = out; + } + + @Override + public void write(byte[] buffer, int offset, int count) throws IOException + { + out.write(buffer, offset, count); + } + + @Override + public void write(int oneByte) throws IOException + { + out.write(oneByte); + } + + @Override + public void close() throws IOException + { + out.close(); + } + + @Override + public void flush() throws IOException + { + out.flush(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index e7d434b..e94f15f 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -30,12 +30,13 @@ import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4FastDecompressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; -import org.xerial.snappy.SnappyInputStream; import org.apache.cassandra.config.Config; +import org.xerial.snappy.SnappyInputStream; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.util.NIODataInputStream; public class IncomingTcpConnection extends Thread { @@ -109,7 +110,7 @@ public class IncomingTcpConnection extends Thread DataOutputStream out = new DataOutputStream(socket.getOutputStream()); out.writeInt(MessagingService.current_version); out.flush(); - DataInputStream in = new DataInputStream(socket.getInputStream()); + DataInput in = new DataInputStream(socket.getInputStream()); int maxVersion = in.readInt(); from = CompactEndpointSerializationHelper.deserialize(in); @@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread } else { - in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE)); + in = new NIODataInputStream(socket.getChannel(), BUFFER_SIZE); } if (version > MessagingService.current_version) @@ -154,7 +155,7 @@ public class IncomingTcpConnection extends Thread } } - private InetAddress receiveMessage(DataInputStream input, int version) throws IOException + private InetAddress receiveMessage(DataInput input, int version) throws IOException { int id; if (version < MessagingService.VERSION_20) http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index dc43106..18ad6c1 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.net; -import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; @@ -43,6 +42,8 @@ import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.CoalescingStrategies; @@ -398,7 +399,8 @@ public class OutboundTcpConnection extends Thread logger.warn("Failed to set send buffer size on internode socket.", se); } } - out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE)); + + out = new BufferedDataOutputStreamPlus(socket.getChannel(), BUFFER_SIZE); out.writeInt(MessagingService.PROTOCOL_MAGIC); writeHeader(out, targetVersion, shouldCompressConnection()); @@ -445,14 +447,14 @@ public class OutboundTcpConnection extends Thread if (targetVersion < MessagingService.VERSION_21) { // Snappy is buffered, so no need for extra buffering output stream - out = new DataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream())); + out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream())); } else { // TODO: custom LZ4 OS that supports BB write methods LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum(); - out = new DataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(), + out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(), 1 << 14, // 16k block size compressor, checksum, http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/GCInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java index c4bffac..ef7f1e2 100644 --- a/src/java/org/apache/cassandra/service/GCInspector.java +++ b/src/java/org/apache/cassandra/service/GCInspector.java @@ -19,11 +19,14 @@ package org.apache.cassandra.service; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; + import javax.management.MBeanServer; import javax.management.Notification; import javax.management.NotificationListener; @@ -34,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sun.management.GarbageCollectionNotificationInfo; + import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.utils.StatusLogger; @@ -43,6 +47,29 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean private static final Logger logger = LoggerFactory.getLogger(GCInspector.class); final static long MIN_LOG_DURATION = 200; final static long MIN_LOG_DURATION_TPSTATS = 1000; + /* + * The field from java.nio.Bits that tracks the total number of allocated + * bytes of direct memory requires via ByteBuffer.allocateDirect that have not been GCed. + */ + final static Field BITS_TOTAL_CAPACITY; + + static + { + Field temp = null; + try + { + Class<?> bitsClass = Class.forName("java.nio.Bits"); + Field f = bitsClass.getDeclaredField("totalCapacity"); + f.setAccessible(true); + temp = f; + } + catch (Throwable t) + { + logger.debug("Error accessing field of java.nio.Bits", t); + //Don't care, will just return the dummy value -1 if we can't get at the field in this JVM + } + BITS_TOTAL_CAPACITY = temp; + } static final class State { @@ -160,13 +187,30 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean public double[] getAndResetStats() { State state = getTotalSinceLastCheck(); - double[] r = new double[6]; + double[] r = new double[7]; r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos); r[1] = state.maxRealTimeElapsed; r[2] = state.totalRealTimeElapsed; r[3] = state.sumSquaresRealTimeElapsed; r[4] = state.totalBytesReclaimed; r[5] = state.count; + r[6] = getAllocatedDirectMemory(); + return r; } + + private static long getAllocatedDirectMemory() + { + if (BITS_TOTAL_CAPACITY == null) return -1; + try + { + return BITS_TOTAL_CAPACITY.getLong(null); + } + catch (Throwable t) + { + logger.trace("Error accessing field of java.nio.Bits", t); + //Don't care how or why we failed to get the value in this JVM. Return -1 to indicate failure + return -1; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/pager/PagingState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java index bbae921..43d3cb8 100644 --- a/src/java/org/apache/cassandra/service/pager/PagingState.java +++ b/src/java/org/apache/cassandra/service/pager/PagingState.java @@ -65,7 +65,7 @@ public class PagingState ByteBufferUtil.writeWithShortLength(partitionKey, out); ByteBufferUtil.writeWithShortLength(cellName, out); out.writeInt(remaining); - return out.asByteBuffer(); + return out.buffer(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 7a7ccbf..780018c 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.streaming; +import java.io.BufferedOutputStream; import java.io.IOException; import java.net.Socket; import java.net.SocketException; @@ -33,10 +34,12 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.FBUtilities; @@ -154,13 +157,13 @@ public class ConnectionHandler protected abstract String name(); - protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException + protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException { WritableByteChannel out = socket.getChannel(); // socket channel is null when encrypted(SSL) if (out == null) - out = Channels.newChannel(socket.getOutputStream()); - return new DataOutputStreamAndChannel(socket.getOutputStream(), out); + return new WrappedDataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream())); + return new BufferedDataOutputStreamPlus(out); } protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException @@ -182,7 +185,9 @@ public class ConnectionHandler isForOutgoing, session.keepSSTableLevel()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); - getWriteChannel(socket).write(messageBuf); + DataOutputStreamPlus out = getWriteChannel(socket); + out.write(messageBuf); + out.flush(); } public void start(Socket socket, int protocolVersion) @@ -308,7 +313,7 @@ public class ConnectionHandler { try { - DataOutputStreamAndChannel out = getWriteChannel(socket); + DataOutputStreamPlus out = getWriteChannel(socket); StreamMessage next; while (!isClosed()) @@ -340,11 +345,12 @@ public class ConnectionHandler } } - private void sendMessage(DataOutputStreamAndChannel out, StreamMessage message) + private void sendMessage(DataOutputStreamPlus out, StreamMessage message) { try { StreamMessage.serialize(message, out, protocolVersion, session); + out.flush(); } catch (SocketException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 93903a7..392dccd 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -20,8 +20,6 @@ package org.apache.cassandra.streaming; import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.Collection; import com.ning.compress.lzf.LZFOutputStream; @@ -30,6 +28,7 @@ import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; @@ -65,10 +64,10 @@ public class StreamWriter * * StreamWriter uses LZF compression on wire to decrease size to transfer. * - * @param channel where this writes data to + * @param output where this writes data to * @throws IOException on any I/O error */ - public void write(WritableByteChannel channel) throws IOException + public void write(DataOutputStreamPlus output) throws IOException { long totalSize = totalSize(); RandomAccessReader file = sstable.openDataReader(); @@ -78,7 +77,7 @@ public class StreamWriter transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize]; // setting up data compression stream - compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel)); + compressedOutput = new LZFOutputStream(output); long progress = 0L; try @@ -106,7 +105,7 @@ public class StreamWriter readOffset = 0; } - // make sure that current section is send + // make sure that current section is sent compressedOutput.flush(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 786ff23..063a49a 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -24,8 +24,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import com.google.common.base.Function; + +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.streaming.ProgressInfo; @@ -49,11 +53,11 @@ public class CompressedStreamWriter extends StreamWriter } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(DataOutputStreamPlus out) throws IOException { long totalSize = totalSize(); RandomAccessReader file = sstable.openDataReader(); - FileChannel fc = file.getChannel(); + final FileChannel fc = file.getChannel(); long progress = 0L; // calculate chunks to transfer. we want to send continuous chunks altogether. @@ -61,7 +65,7 @@ public class CompressedStreamWriter extends StreamWriter try { // stream each of the required sections of the file - for (Pair<Long, Long> section : sections) + for (final Pair<Long, Long> section : sections) { // length of the section to stream long length = section.right - section.left; @@ -69,9 +73,23 @@ public class CompressedStreamWriter extends StreamWriter long bytesTransferred = 0; while (bytesTransferred < length) { - int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); + final long bytesTransferredFinal = bytesTransferred; + final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); limiter.acquire(toTransfer); - long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel); + long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>() + { + public Long apply(WritableByteChannel wbc) + { + try + { + return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc); + } + catch (IOException e) + { + throw new FSWriteError(e, sstable.getFilename()); + } + } + }); bytesTransferred += lastWrite; progress += lastWrite; session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java index ec9c66c..b555f64 100644 --- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java @@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; import java.nio.channels.ReadableByteChannel; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamSession; public class CompleteMessage extends StreamMessage @@ -32,7 +32,7 @@ public class CompleteMessage extends StreamMessage return new CompleteMessage(); } - public void serialize(CompleteMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {} + public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {} }; public CompleteMessage() http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index 237fb70..33298bf 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -23,7 +23,7 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.compress.CompressedStreamReader; @@ -55,7 +55,7 @@ public class IncomingFileMessage extends StreamMessage } } - public void serialize(IncomingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 7047c84..bfa02fa 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -23,13 +23,12 @@ import java.util.List; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamWriter; import org.apache.cassandra.streaming.compress.CompressedStreamWriter; import org.apache.cassandra.streaming.compress.CompressionInfo; import org.apache.cassandra.utils.Pair; - import org.apache.cassandra.utils.concurrent.Ref; /** @@ -44,7 +43,7 @@ public class OutgoingFileMessage extends StreamMessage throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file"); } - public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { FileMessageHeader.serializer.serialize(message.header, out, version); @@ -54,7 +53,7 @@ public class OutgoingFileMessage extends StreamMessage new CompressedStreamWriter(reader, message.header.sections, message.header.compressionInfo, session); - writer.write(out.getChannel()); + writer.write(out); session.fileSent(message.header); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java index 7efe075..004df18 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java @@ -23,7 +23,7 @@ import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.Collection; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamRequest; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamSummary; @@ -47,7 +47,7 @@ public class PrepareMessage extends StreamMessage return message; } - public void serialize(PrepareMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { // requests out.writeInt(message.requests.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java index f206d0d..1255947 100644 --- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java @@ -22,7 +22,7 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.UUID; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.UUIDSerializer; @@ -37,7 +37,7 @@ public class ReceivedMessage extends StreamMessage return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt()); } - public void serialize(ReceivedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version); out.writeInt(message.sequenceNumber); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java index 8d5707a..29e84bf 100644 --- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java @@ -22,7 +22,7 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.UUID; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.UUIDSerializer; @@ -37,7 +37,7 @@ public class RetryMessage extends StreamMessage return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt()); } - public void serialize(RetryMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version); out.writeInt(message.sequenceNumber); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java index ae15620..46f49d6 100644 --- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java @@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; import java.nio.channels.ReadableByteChannel; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamSession; public class SessionFailedMessage extends StreamMessage @@ -32,7 +32,7 @@ public class SessionFailedMessage extends StreamMessage return new SessionFailedMessage(); } - public void serialize(SessionFailedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {} + public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {} }; public SessionFailedMessage() http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 20490db..8e3eeef 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamSession; /** @@ -36,7 +36,7 @@ public abstract class StreamMessage public static final int VERSION_30 = 3; public static final int CURRENT_VERSION = VERSION_30; - public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { ByteBuffer buff = ByteBuffer.allocate(1); // message type @@ -67,7 +67,7 @@ public abstract class StreamMessage public static interface Serializer<V extends StreamMessage> { V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException; - void serialize(V message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException; + void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException; } /** StreamMessage types */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 5b49ae3..e92e0c6 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1867,6 +1867,10 @@ public class CassandraServer implements Cassandra.Iface { throw new InvalidRequestException("Error deflating query string."); } + catch (IOException e) + { + throw new AssertionError(e); + } return queryString; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 5a1d6b4..2805c52 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1186,7 +1186,7 @@ public class NodeProbe implements AutoCloseable } catch (Exception e) { - throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e); + throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index e6d4df6..fa6966c 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -2690,8 +2690,8 @@ public class NodeTool double[] stats = probe.getAndResetGCStats(); double mean = stats[2] / stats[5]; double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean)); - System.out.printf("%20s%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections"); - System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5]); + System.out.printf("%20s%20s%20s%20s%20s%20s%25s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections", "Direct Memory Bytes"); + System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%25d%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5], (long)stats[6]); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index b37e0da..8f0dee0 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -36,6 +36,7 @@ import java.util.UUID; import io.netty.buffer.*; import io.netty.util.CharsetUtil; +import org.apache.cassandra.config.Config; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.utils.Pair; @@ -51,7 +52,8 @@ import org.apache.cassandra.utils.ByteBufferUtil; */ public abstract class CBUtil { - public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true); + public static final boolean USE_HEAP_ALLOCATOR = Boolean.getBoolean(Config.PROPERTY_PREFIX + "netty_use_heap_allocator"); + public static final ByteBufAllocator allocator = USE_HEAP_ALLOCATOR ? new UnpooledByteBufAllocator(false) : new PooledByteBufAllocator(true); private CBUtil() {} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java index 8304bd5..d2b2879 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java +++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java @@ -35,6 +35,10 @@ public abstract class MemoryUtil private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET; private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET; private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET; + private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET; + private static final Class<?> BYTE_BUFFER_CLASS; + private static final long BYTE_BUFFER_OFFSET_OFFSET; + private static final long BYTE_BUFFER_HB_OFFSET; private static final long BYTE_ARRAY_BASE_OFFSET; private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); @@ -57,7 +61,14 @@ public abstract class MemoryUtil DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("address")); DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity")); DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit")); + DIRECT_BYTE_BUFFER_POSITION_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position")); DIRECT_BYTE_BUFFER_CLASS = clazz; + + clazz = ByteBuffer.allocate(0).getClass(); + BYTE_BUFFER_OFFSET_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("offset")); + BYTE_BUFFER_HB_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("hb")); + BYTE_BUFFER_CLASS = clazz; + BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class); } catch (Exception e) @@ -144,6 +155,21 @@ public abstract class MemoryUtil return instance; } + public static ByteBuffer getHollowByteBuffer() + { + ByteBuffer instance; + try + { + instance = (ByteBuffer) unsafe.allocateInstance(BYTE_BUFFER_CLASS); + } + catch (InstantiationException e) + { + throw new AssertionError(e); + } + instance.order(ByteOrder.nativeOrder()); + return instance; + } + public static void setByteBuffer(ByteBuffer instance, long address, int length) { unsafe.putLong(instance, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address); @@ -151,6 +177,27 @@ public abstract class MemoryUtil unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length); } + public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer) + { + assert(source.isDirect()); + unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET)); + unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET)); + unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET)); + unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET)); + return hollowBuffer; + } + + public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer) + { + assert(!source.isDirect()); + unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET)); + unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET)); + unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET)); + unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET)); + unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET)); + return hollowBuffer; + } + public static long getLongByByte(long address) { if (BIG_ENDIAN) http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java index 92612b6..fe43ff2 100644 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java +++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java @@ -20,13 +20,13 @@ package org.apache.cassandra.utils.vint; import java.io.IOException; import java.io.OutputStream; -import org.apache.cassandra.io.util.AbstractDataOutput; +import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus; /** * Borrows idea from * https://developers.google.com/protocol-buffers/docs/encoding#varints */ -public class EncodedDataOutputStream extends AbstractDataOutput +public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus { private OutputStream out; http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/AbstractSerializationsTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java index 15e5d34..ebfa79d 100644 --- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java +++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java @@ -21,7 +21,8 @@ package org.apache.cassandra; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.net.MessagingService; import java.io.DataInputStream; @@ -65,10 +66,11 @@ public class AbstractSerializationsTester return new DataInputStream(new FileInputStream(f)); } - protected static DataOutputStreamAndChannel getOutput(String name) throws IOException + @SuppressWarnings("resource") + protected static DataOutputStreamPlus getOutput(String name) throws IOException { File f = new File("test/data/serialization/" + CUR_VER + "/" + name); f.getParentFile().mkdirs(); - return new DataOutputStreamAndChannel(new FileOutputStream(f)); + return new BufferedDataOutputStreamPlus(new FileOutputStream(f).getChannel()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/db/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java index f8e757a..a280448 100644 --- a/test/unit/org/apache/cassandra/db/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java @@ -32,7 +32,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.net.CallbackInfo; import org.apache.cassandra.net.MessageIn; @@ -40,7 +40,6 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; - import org.junit.BeforeClass; import org.junit.Test; @@ -92,7 +91,7 @@ public class SerializationsTest extends AbstractSerializationsTester RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100); MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage(); - DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin"); + DataOutputStreamPlus out = getOutput("db.RangeSliceCommand.bin"); namesCmdMsg.serialize(out, getVersion()); emptyRangeCmdMsg.serialize(out, getVersion()); regRangeCmdMsg.serialize(out, getVersion()); @@ -127,7 +126,7 @@ public class SerializationsTest extends AbstractSerializationsTester SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred); SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred); - DataOutputStreamAndChannel out = getOutput("db.SliceByNamesReadCommand.bin"); + DataOutputStreamPlus out = getOutput("db.SliceByNamesReadCommand.bin"); SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion()); SliceByNamesReadCommand.serializer.serialize(superCmd, out, getVersion()); ReadCommand.serializer.serialize(standardCmd, out, getVersion()); @@ -161,8 +160,8 @@ public class SerializationsTest extends AbstractSerializationsTester { SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred); SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred); - - DataOutputStreamAndChannel out = getOutput("db.SliceFromReadCommand.bin"); + + DataOutputStreamPlus out = getOutput("db.SliceFromReadCommand.bin"); SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion()); SliceFromReadCommand.serializer.serialize(superCmd, out, getVersion()); ReadCommand.serializer.serialize(standardCmd, out, getVersion()); @@ -195,7 +194,7 @@ public class SerializationsTest extends AbstractSerializationsTester private void testRowWrite() throws IOException { - DataOutputStreamAndChannel out = getOutput("db.Row.bin"); + DataOutputStreamPlus out = getOutput("db.Row.bin"); Row.serializer.serialize(statics.StandardRow, out, getVersion()); Row.serializer.serialize(statics.SuperRow, out, getVersion()); Row.serializer.serialize(statics.NullRow, out, getVersion()); @@ -232,7 +231,7 @@ public class SerializationsTest extends AbstractSerializationsTester mods.put(statics.SuperCf.metadata().cfId, statics.SuperCf); Mutation mixedRm = new Mutation(statics.KS, statics.Key, mods); - DataOutputStreamAndChannel out = getOutput("db.RowMutation.bin"); + DataOutputStreamPlus out = getOutput("db.RowMutation.bin"); Mutation.serializer.serialize(standardRowRm, out, getVersion()); Mutation.serializer.serialize(superRowRm, out, getVersion()); Mutation.serializer.serialize(standardRm, out, getVersion()); @@ -281,7 +280,7 @@ public class SerializationsTest extends AbstractSerializationsTester Truncation tr = new Truncation(statics.KS, "Doesn't Really Matter"); TruncateResponse aff = new TruncateResponse(statics.KS, "Doesn't Matter Either", true); TruncateResponse neg = new TruncateResponse(statics.KS, "Still Doesn't Matter", false); - DataOutputStreamAndChannel out = getOutput("db.Truncation.bin"); + DataOutputStreamPlus out = getOutput("db.Truncation.bin"); Truncation.serializer.serialize(tr, out, getVersion()); TruncateResponse.serializer.serialize(aff, out, getVersion()); TruncateResponse.serializer.serialize(neg, out, getVersion()); @@ -323,7 +322,7 @@ public class SerializationsTest extends AbstractSerializationsTester { WriteResponse aff = new WriteResponse(); WriteResponse neg = new WriteResponse(); - DataOutputStreamAndChannel out = getOutput("db.WriteResponse.bin"); + DataOutputStreamPlus out = getOutput("db.WriteResponse.bin"); WriteResponse.serializer.serialize(aff, out, getVersion()); WriteResponse.serializer.serialize(neg, out, getVersion()); out.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/gms/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java index a773ccf..080ae53 100644 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@ -20,7 +20,7 @@ package org.apache.cassandra.gms; import org.apache.cassandra.AbstractSerializationsTester; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.junit.Test; @@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester { private void testEndpointStateWrite() throws IOException { - DataOutputStreamAndChannel out = getOutput("gms.EndpointState.bin"); + DataOutputStreamPlus out = getOutput("gms.EndpointState.bin"); HeartBeatState.serializer.serialize(Statics.HeartbeatSt, out, getVersion()); EndpointState.serializer.serialize(Statics.EndpointSt, out, getVersion()); VersionedValue.serializer.serialize(Statics.vv0, out, getVersion()); @@ -75,7 +75,7 @@ public class SerializationsTest extends AbstractSerializationsTester GossipDigestAck2 ack2 = new GossipDigestAck2(states); GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests); - DataOutputStreamAndChannel out = getOutput("gms.Gossip.bin"); + DataOutputStreamPlus out = getOutput("gms.Gossip.bin"); for (GossipDigest gd : Statics.Digests) GossipDigest.serializer.serialize(gd, out, getVersion()); GossipDigestAck.serializer.serialize(ack, out, getVersion()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java index a7010ae..6471558 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java @@ -134,6 +134,10 @@ public class IndexSummaryTest IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner()); return Pair.create(list, summary); } + catch (IOException e) + { + throw new RuntimeException(e); + } } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index 9cc2d23..eda4f17 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -25,15 +25,16 @@ import java.util.Map; import java.util.Set; import com.google.common.collect.Sets; -import org.junit.Test; +import org.junit.Test; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.composites.SimpleDenseCellNameType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.EstimatedHistogram; @@ -70,7 +71,7 @@ public class MetadataSerializerTest MetadataSerializer serializer = new MetadataSerializer(); // Serialize to tmp file File statsFile = File.createTempFile(Component.STATS.name, null); - try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(statsFile))) + try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile))) { serializer.serialize(originalMetadata, out); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java new file mode 100644 index 0000000..8ac6d92 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java @@ -0,0 +1,391 @@ +package org.apache.cassandra.io.util; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Random; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class BufferedDataOutputStreamTest +{ + WritableByteChannel adapter = new WritableByteChannel() + { + + @Override + public boolean isOpen() {return true;} + + @Override + public void close() throws IOException {} + + @Override + public int write(ByteBuffer src) throws IOException + { + int retval = src.remaining(); + while (src.hasRemaining()) + generated.write(src.get()); + return retval; + } + + }; + + BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8); + + @SuppressWarnings("resource") + @Test(expected = NullPointerException.class) + public void testNullChannel() + { + new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8); + } + + @SuppressWarnings("resource") + @Test(expected = IllegalArgumentException.class) + public void testTooSmallBuffer() + { + new BufferedDataOutputStreamPlus(adapter, 7); + } + + @Test(expected = NullPointerException.class) + public void testNullBuffer() throws Exception + { + byte type[] = null; + fakeStream.write(type, 0, 1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testNegativeOffset() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, -1, 1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testNegativeLength() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 0, -1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testTooBigLength() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 0, 11); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testTooBigLengthWithOffset() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 8, 3); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testTooBigOffset() throws Exception + { + byte type[] = new byte[10]; + fakeStream.write(type, 11, 1); + } + + static final Random r; + + static Field baos_bytes; + static { + long seed = System.nanoTime(); + //seed = 210187780999648L; + System.out.println("Seed " + seed); + r = new Random(seed); + try + { + baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf"); + baos_bytes.setAccessible(true); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + } + + private ByteArrayOutputStream generated; + private BufferedDataOutputStreamPlus ndosp; + + private ByteArrayOutputStream canonical; + private DataOutputStreamPlus dosp; + + void setUp() + { + + generated = new ByteArrayOutputStream(); + canonical = new ByteArrayOutputStream(); + dosp = new WrappedDataOutputStreamPlus(canonical); + ndosp = new BufferedDataOutputStreamPlus(adapter, 4096); + } + + @Test + public void testFuzz() throws Exception + { + for (int ii = 0; ii < 30; ii++) + fuzzOnce(); + } + + String simple = "foobar42"; + String twoByte = "Æ"; + String threeByte = "ã¨"; + String fourByte = "ð ¹"; + + @SuppressWarnings("unused") + private void fuzzOnce() throws Exception + { + setUp(); + int iteration = 0; + int bytesChecked = 0; + int action = 0; + while (generated.size() < 1024 * 1024 * 8) + { + action = r.nextInt(18); + + //System.out.println("Action " + action + " iteration " + iteration); + iteration++; + + switch (action) + { + case 0: + { + generated.flush(); + dosp.flush(); + break; + } + case 1: + { + int val = r.nextInt(); + dosp.write(val); + ndosp.write(val); + break; + } + case 2: + { + byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)]; + r.nextBytes(randomBytes); + dosp.write(randomBytes); + ndosp.write(randomBytes); + break; + } + case 3: + { + byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)]; + r.nextBytes(randomBytes); + int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length); + int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset); + dosp.write(randomBytes, offset, length); + ndosp.write(randomBytes, offset, length); + break; + } + case 4: + { + boolean val = r.nextInt(2) == 0; + dosp.writeBoolean(val); + ndosp.writeBoolean(val); + break; + } + case 5: + { + int val = r.nextInt(); + dosp.writeByte(val); + ndosp.writeByte(val); + break; + } + case 6: + { + int val = r.nextInt(); + dosp.writeShort(val); + ndosp.writeShort(val); + break; + } + case 7: + { + int val = r.nextInt(); + dosp.writeChar(val); + ndosp.writeChar(val); + break; + } + case 8: + { + int val = r.nextInt(); + dosp.writeInt(val); + ndosp.writeInt(val); + break; + } + case 9: + { + int val = r.nextInt(); + dosp.writeLong(val); + ndosp.writeLong(val); + break; + } + case 10: + { + float val = r.nextFloat(); + dosp.writeFloat(val); + ndosp.writeFloat(val); + break; + } + case 11: + { + double val = r.nextDouble(); + dosp.writeDouble(val); + ndosp.writeDouble(val); + break; + } + case 12: + { + dosp.writeBytes(simple); + ndosp.writeBytes(simple); + break; + } + case 13: + { + dosp.writeChars(twoByte); + ndosp.writeChars(twoByte); + break; + } + case 14: + { + StringBuilder sb = new StringBuilder(); + int length = r.nextInt(500); + sb.append(simple + twoByte + threeByte + fourByte); + for (int ii = 0; ii < length; ii++) + { + sb.append((char)(r.nextInt() & 0xffff)); + } + String str = sb.toString(); + writeUTFLegacy(str, dosp); + ndosp.writeUTF(str); + break; + } + case 15: + { + ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1)); + r.nextBytes(buf.array()); + buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity())); + buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position()))); + ByteBuffer dup = buf.duplicate(); + ndosp.write(buf.duplicate()); + assertEquals(dup.position(), buf.position()); + assertEquals(dup.limit(), buf.limit()); + dosp.write(buf.duplicate()); + break; + } + case 16: + { + ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1)); + while (buf.hasRemaining()) + buf.put((byte)r.nextInt()); + buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity())); + buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position()))); + ByteBuffer dup = buf.duplicate(); + ndosp.write(buf.duplicate()); + assertEquals(dup.position(), buf.position()); + assertEquals(dup.limit(), buf.limit()); + dosp.write(buf.duplicate()); + break; + } + case 17: + { + try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);) + { + for (int ii = 0; ii < buf.size(); ii++) + buf.setByte(ii, (byte)r.nextInt()); + long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size()); + long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset))); + ndosp.write(buf, offset, length); + dosp.write(buf, offset, length); + } + break; + } + default: + fail("Shouldn't reach here"); + } + //bytesChecked = assertSameOutput(bytesChecked, action, iteration); + } + + assertSameOutput(0, -1, iteration); + } + + static void writeUTFLegacy(String str, DataOutput out) throws IOException + { + int utfCount = 0, length = str.length(); + for (int i = 0; i < length; i++) + { + int charValue = str.charAt(i); + if (charValue > 0 && charValue <= 127) + { + utfCount++; + } + else if (charValue <= 2047) + { + utfCount += 2; + } + else + { + utfCount += 3; + } + } + if (utfCount > 65535) + { + throw new UTFDataFormatException(); //$NON-NLS-1$ + } + byte utfBytes[] = new byte[utfCount + 2]; + int utfIndex = 2; + for (int i = 0; i < length; i++) + { + int charValue = str.charAt(i); + if (charValue > 0 && charValue <= 127) + { + utfBytes[utfIndex++] = (byte) charValue; + } + else if (charValue <= 2047) + { + utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue)); + } + else + { + utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue)); + } + } + utfBytes[0] = (byte) (utfCount >> 8); + utfBytes[1] = (byte) utfCount; + out.write(utfBytes); + } + + private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception + { + ndosp.flush(); + dosp.flush(); + + byte generatedBytes[] = (byte[])baos_bytes.get(generated); + byte canonicalBytes[] = (byte[])baos_bytes.get(canonical); + + int count = generated.size(); + if (count != canonical.size()) + System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration); + assertEquals(count, canonical.size()); + for (;bytesChecked < count; bytesChecked++) + { + byte generatedByte = generatedBytes[bytesChecked]; + byte canonicalByte = canonicalBytes[bytesChecked]; + if (generatedByte != canonicalByte) + System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration); + assertEquals(generatedByte, canonicalByte); + } + return count; + } +}
