This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit c919e3fff0c59c2e831350e98f3345a9eec3630e Author: Jacob Barrett <[email protected]> AuthorDate: Fri May 21 12:34:55 2021 -0700 GEODE-6588: Cleanup Connection --- .../org/apache/geode/internal/tcp/Connection.java | 50 ++- .../geode/internal/tcp/DirectReplySender.java | 20 +- .../org/apache/geode/internal/tcp/MsgStreamer.java | 337 ++++++++++----------- .../geode/internal/tcp/VersionedMsgStreamer.java | 13 +- 4 files changed, 193 insertions(+), 227 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 8969ab5..bbb6ce3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -241,14 +241,14 @@ public class Connection implements Runnable { * Maps ConflatedKey instances to ConflatedKey instance. Note that even though the key and value * for an entry is the map will always be "equal" they will not always be "==". */ - private final Map conflatedKeys = new HashMap(); + private final Map<ConflationKey, ConflationKey> conflatedKeys = new HashMap<>(); /** * NOTE: LinkedBlockingQueue has a bug in which removes from the queue * cause future offer to increase the size without adding anything to the queue. * So I've changed from this backport class to a java.util.LinkedList */ - private final LinkedList outgoingQueue = new LinkedList(); + private final LinkedList<Object> outgoingQueue = new LinkedList<>(); /** * Number of bytes in the outgoingQueue. Used to control capacity. @@ -311,7 +311,7 @@ public class Connection implements Runnable { * other connections participating in the current transmission. we notify them if ackSATimeout * expires to keep all members from generating alerts when only one is slow */ - private List ackConnectionGroup; + private List<Connection> ackConnectionGroup; /** name of thread that we're currently performing an operation in (may be null) */ private String ackThreadName; @@ -349,7 +349,7 @@ public class Connection implements Runnable { /** * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages */ - private HashMap destreamerMap; + private HashMap<Short, MsgDestreamer> destreamerMap; private boolean directAck; @@ -1139,8 +1139,8 @@ public class Connection implements Runnable { } private void setRemoteAddr(InternalDistributedMember m) { - this.remoteAddr = this.owner.getDM().getCanonicalId(m); - Membership<InternalDistributedMember> mgr = this.conduit.getMembership(); + remoteAddr = owner.getDM().getCanonicalId(m); + Membership<InternalDistributedMember> mgr = conduit.getMembership(); mgr.addSurpriseMember(m); } @@ -1840,9 +1840,8 @@ public class Connection implements Runnable { idleMsgDestreamer = null; } if (destreamerMap != null) { - for (Object o : destreamerMap.values()) { - MsgDestreamer md = (MsgDestreamer) o; - md.close(); + for (MsgDestreamer msgDestreamer : destreamerMap.values()) { + msgDestreamer.close(); } destreamerMap = null; } @@ -1852,10 +1851,10 @@ public class Connection implements Runnable { private MsgDestreamer obtainMsgDestreamer(short msgId, final KnownVersion v) { synchronized (destreamerLock) { if (destreamerMap == null) { - destreamerMap = new HashMap(); + destreamerMap = new HashMap<>(); } Short key = msgId; - MsgDestreamer result = (MsgDestreamer) destreamerMap.get(key); + MsgDestreamer result = destreamerMap.get(key); if (result == null) { result = idleMsgDestreamer; if (result != null) { @@ -1937,7 +1936,7 @@ public class Connection implements Runnable { * false then "release" the connection. */ public void setInUse(boolean use, long startTime, long ackWaitThreshold, long ackSAThreshold, - List connectionGroup) { + List<Connection> connectionGroup) { // just do the following; EVEN if the connection has been closed synchronized (this) { if (use && (ackWaitThreshold > 0 || ackSAThreshold > 0)) { @@ -2016,13 +2015,12 @@ public class Connection implements Runnable { } } } - List group = ackConnectionGroup; + List<Connection> group = ackConnectionGroup; if (sentAlert && group != null) { // since transmission and ack-receipt are performed serially, we don't want to complain // about all receivers out just because one was slow. We therefore reset the time stamps // and give others more time - for (Object o : group) { - Connection con = (Connection) o; + for (Connection con : group) { if (con != Connection.this) { con.transmissionStartTime += con.ackSATimeout; } @@ -2115,12 +2113,11 @@ public class Connection implements Runnable { if (ck != null) { if (ck.allowsConflation()) { objToQueue = ck; - Object oldValue = conflatedKeys.put(ck, ck); + ConflationKey oldValue = conflatedKeys.put(ck, ck); if (oldValue != null) { - ConflationKey oldck = (ConflationKey) oldValue; - ByteBuffer oldBuffer = oldck.getBuffer(); + ByteBuffer oldBuffer = oldValue.getBuffer(); // need to always do this to allow old buffer to be gc'd - oldck.setBuffer(null); + oldValue.setBuffer(null); // remove the conflated key from current spot in queue @@ -2133,7 +2130,7 @@ public class Connection implements Runnable { // and if it has the same identity of our last thing then // remove it - if (outgoingQueue.getLast() == oldck) { + if (outgoingQueue.getLast() == oldValue) { outgoingQueue.removeLast(); } int oldBytes = oldBuffer.remaining(); @@ -2453,12 +2450,10 @@ public class Connection implements Runnable { if (!preserveOrder) { return true; } + // or the receiver does not allow queuing - if (asyncDistributionTimeout == 0) { - return true; - } // OTHERWISE return false and let caller send async - return false; + return asyncDistributionTimeout == 0; } private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync, @@ -2692,7 +2687,7 @@ public class Connection implements Runnable { int len; // (we have to lock here to protect between reading header and message body) - try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) { + try (final ByteBufferSharing ignored = ioFilter.getUnwrappedBuffer()) { Header header = msgReader.readHeader(); if (header.getMessageType() == NORMAL_MSG_TYPE) { @@ -2892,8 +2887,7 @@ public class Connection implements Runnable { remoteVersion = Versioning.getKnownVersionOrDefault( Versioning.getVersion(VersioningIO.readOrdinal(dis)), null); - int dominoNumber = 0; - dominoNumber = dis.readInt(); + int dominoNumber = dis.readInt(); if (sharedResource) { dominoNumber = 0; } @@ -3323,7 +3317,7 @@ public class Connection implements Runnable { public String toString() { return remoteAddr + "(uid=" + uniqueId + ")" + (remoteVersion != null && remoteVersion != KnownVersion.CURRENT - ? "(v" + remoteVersion.toString() + ')' : ""); + ? "(v" + remoteVersion + ')' : ""); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java index 78f53f7..e5b232e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java @@ -14,9 +14,10 @@ */ package org.apache.geode.internal.tcp; +import static java.util.Collections.singletonList; + import java.io.IOException; import java.io.NotSerializableException; -import java.util.ArrayList; import java.util.Collections; import java.util.Set; @@ -43,30 +44,27 @@ class DirectReplySender implements ReplySender { @Immutable private static final DMStats DUMMY_STATS = new DummyDMStats(); - private final Connection conn; + private final Connection connection; private boolean sentReply = false; public DirectReplySender(Connection connection) { - this.conn = connection; + this.connection = connection; } @Override public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) { - Assert.assertTrue(!this.sentReply, "Trying to reply twice to a message"); + Assert.assertTrue(!sentReply, "Trying to reply twice to a message"); // Using an ArrayList, rather than Collections.singletonList here, because the MsgStreamer // mutates the list when it has exceptions. - // fix for bug #42199 - cancellation check - this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null); + connection.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null); if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { logger.trace(LogMarker.DM_VERBOSE, "Sending a direct reply {} to {}", msg, - conn.getRemoteAddress()); + connection.getRemoteAddress()); } - ArrayList<Connection> conns = new ArrayList<Connection>(1); - conns.add(conn); - MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS, - conn.getBufferPool()); + MsgStreamer ms = (MsgStreamer) MsgStreamer.create(singletonList(connection), msg, false, + DUMMY_STATS, connection.getBufferPool()); try { ms.writeMessage(); ConnectExceptions ce = ms.getConnectExceptions(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java index 8e9afa8..e0c54b9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java @@ -25,6 +25,7 @@ import java.util.List; import it.unimi.dsi.fastutil.objects.Object2ObjectMap; import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; +import org.jetbrains.annotations.NotNull; import org.apache.geode.DataSerializer; import org.apache.geode.annotations.VisibleForTesting; @@ -55,7 +56,7 @@ public class MsgStreamer extends OutputStream /** * List of connections to send this msg to. */ - private final List<?> cons; + private final List<Connection> cons; private final BufferPool bufferPool; @@ -95,10 +96,10 @@ public class MsgStreamer extends OutputStream * Called to free up resources used by this streamer after the streamer has produced its message. */ protected void release() { - MsgIdGenerator.release(this.msgId); - this.buffer.clear(); - this.overflowBuf = null; - bufferPool.releaseSenderBuffer(this.buffer); + MsgIdGenerator.release(msgId); + buffer.clear(); + overflowBuf = null; + bufferPool.releaseSenderBuffer(buffer); } /** @@ -107,7 +108,7 @@ public class MsgStreamer extends OutputStream */ @Override public ConnectExceptions getConnectExceptions() { - return this.ce; + return ce; } /** @@ -116,7 +117,7 @@ public class MsgStreamer extends OutputStream */ @Override public List<?> getSentConnections() { - return this.cons; + return cons; } /** @@ -125,16 +126,16 @@ public class MsgStreamer extends OutputStream * Note: This is no longer supposed to be called directly rather the {@link #create} method should * now be used. */ - MsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats, + MsgStreamer(List<Connection> cons, DistributionMessage msg, boolean directReply, DMStats stats, int sendBufferSize, BufferPool bufferPool) { this.stats = stats; this.msg = msg; this.cons = cons; int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE); - this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize); - this.buffer.clear(); - this.buffer.position(Connection.MSG_HEADER_BYTES); - this.msgId = MsgIdGenerator.NO_MSG_ID; + buffer = bufferPool.acquireDirectSenderBuffer(bufferSize); + buffer.clear(); + buffer.position(Connection.MSG_HEADER_BYTES); + msgId = MsgIdGenerator.NO_MSG_ID; this.directReply = directReply; this.bufferPool = bufferPool; startSerialization(); @@ -145,31 +146,22 @@ public class MsgStreamer extends OutputStream * connections to remote nodes. This method can either return a single MsgStreamer object or a * List of MsgStreamer objects. */ - public static BaseMsgStreamer create(List<?> cons, final DistributionMessage msg, + public static BaseMsgStreamer create(List<Connection> cons, final DistributionMessage msg, final boolean directReply, final DMStats stats, BufferPool bufferPool) { - final Connection firstCon = (Connection) cons.get(0); + final Connection firstCon = cons.get(0); // split into different versions if required KnownVersion version; final int numCons = cons.size(); if (numCons > 1) { - Connection con; - Object2ObjectOpenHashMap versionToConnMap = null; + Object2ObjectOpenHashMap<KnownVersion, List<Connection>> versionToConnMap = null; int numVersioned = 0; - for (Object c : cons) { - con = (Connection) c; + for (Connection con : cons) { version = con.getRemoteVersion(); - if (version != null - && KnownVersion.CURRENT_ORDINAL > version.ordinal()) { + if (version != null && KnownVersion.CURRENT_ORDINAL > version.ordinal()) { if (versionToConnMap == null) { - versionToConnMap = new Object2ObjectOpenHashMap(); + versionToConnMap = new Object2ObjectOpenHashMap<>(); } - @SuppressWarnings("unchecked") - ArrayList<Object> vcons = (ArrayList<Object>) versionToConnMap.get(version); - if (vcons == null) { - vcons = new ArrayList<Object>(numCons); - versionToConnMap.put(version, vcons); - } - vcons.add(con); + versionToConnMap.computeIfAbsent(version, k -> new ArrayList<>(numCons)).add(con); numVersioned++; } } @@ -179,15 +171,13 @@ public class MsgStreamer extends OutputStream } else { // if there is a versioned stream created, then split remaining // connections to unversioned stream - final ArrayList<MsgStreamer> streamers = - new ArrayList<MsgStreamer>(versionToConnMap.size() + 1); + final ArrayList<MsgStreamer> streamers = new ArrayList<>(versionToConnMap.size() + 1); final int sendBufferSize = firstCon.getSendBufferSize(); if (numCons > numVersioned) { // allocating list of numCons size so that as the result of // getSentConnections it may not need to be reallocted later - final ArrayList<Object> currentVersionConnections = new ArrayList<Object>(numCons); - for (Object c : cons) { - con = (Connection) c; + final List<Connection> currentVersionConnections = new ArrayList<>(numCons); + for (Connection con : cons) { version = con.getRemoteVersion(); if (version == null || version.ordinal() >= KnownVersion.CURRENT_ORDINAL) { currentVersionConnections.add(con); @@ -197,13 +187,13 @@ public class MsgStreamer extends OutputStream new MsgStreamer(currentVersionConnections, msg, directReply, stats, sendBufferSize, bufferPool)); } - for (ObjectIterator<Object2ObjectMap.Entry> itr = + for (ObjectIterator<Object2ObjectMap.Entry<KnownVersion, List<Connection>>> itr = versionToConnMap.object2ObjectEntrySet().fastIterator(); itr.hasNext();) { - Object2ObjectMap.Entry entry = itr.next(); - Object ver = entry.getKey(); - Object l = entry.getValue(); - streamers.add(new VersionedMsgStreamer((List<?>) l, msg, directReply, stats, - bufferPool, sendBufferSize, (KnownVersion) ver)); + Object2ObjectMap.Entry<KnownVersion, List<Connection>> entry = itr.next(); + KnownVersion ver = entry.getKey(); + List<Connection> l = entry.getValue(); + streamers.add(new VersionedMsgStreamer(l, msg, directReply, stats, + bufferPool, sendBufferSize, ver)); } return new MsgStreamerList(streamers); } @@ -224,8 +214,7 @@ public class MsgStreamer extends OutputStream */ @Override public void reserveConnections(long startTime, long ackTimeout, long ackSDTimeout) { - for (Iterator it = cons.iterator(); it.hasNext();) { - Connection con = (Connection) it.next(); + for (final Connection con : cons) { con.setInUse(true, startTime, ackTimeout, ackSDTimeout, cons); if (ackTimeout > 0) { con.scheduleAckTimeouts(); @@ -234,7 +223,7 @@ public class MsgStreamer extends OutputStream } private void startSerialization() { - this.serStartTime = stats.startMsgSerialization(); + serStartTime = stats.startMsgSerialization(); } /** @@ -245,15 +234,15 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(this.msg); try { - this.startedSerializingMsg = true; - InternalDataSerializer.writeDSFID(this.msg, this); - this.doneWritingMsg = true; - if (this.flushedBytes == 0) { + startedSerializingMsg = true; + InternalDataSerializer.writeDSFID(msg, this); + doneWritingMsg = true; + if (flushedBytes == 0) { // message fit in one chunk - this.normalMsg = true; + normalMsg = true; } realFlush(true); - return this.flushedBytes; + return flushedBytes; } finally { release(); } @@ -265,18 +254,18 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" byte={}", b); ensureCapacity(1); - if (this.overflowBuf != null) { - this.overflowBuf.write(b); + if (overflowBuf != null) { + overflowBuf.write(b); return; } - this.buffer.put((byte) (b & 0xff)); + buffer.put((byte) (b & 0xff)); } private void ensureCapacity(int amount) { - if (this.overflowBuf != null) { + if (overflowBuf != null) { return; } - int remainingSpace = this.buffer.capacity() - this.buffer.position(); + int remainingSpace = buffer.capacity() - buffer.position(); if (amount > remainingSpace) { realFlush(false); } @@ -292,65 +281,65 @@ public class MsgStreamer extends OutputStream private HeapDataOutputStream overflowBuf = null; private boolean isOverflowMode() { - return this.overflowMode > 0; + return overflowMode > 0; } private void enableOverflowMode() { - this.overflowMode++; + overflowMode++; } private void disableOverflowMode() { - this.overflowMode--; + overflowMode--; if (!isOverflowMode()) { - this.overflowBuf = null; + overflowBuf = null; } } public void realFlush(boolean lastFlushForMessage) { if (isOverflowMode()) { - if (this.overflowBuf == null) { - this.overflowBuf = new HeapDataOutputStream( - this.buffer.capacity() - Connection.MSG_HEADER_BYTES, KnownVersion.CURRENT); + if (overflowBuf == null) { + overflowBuf = new HeapDataOutputStream( + buffer.capacity() - Connection.MSG_HEADER_BYTES, KnownVersion.CURRENT); } return; } - this.buffer.flip(); + buffer.flip(); setMessageHeader(); - final int serializedBytes = this.buffer.limit(); - this.flushedBytes += serializedBytes; + final int serializedBytes = buffer.limit(); + flushedBytes += serializedBytes; DistributionMessage conflationMsg = null; - if (this.normalMsg) { + if (normalMsg) { // we can't conflate chunked messages; this fixes bug 36633 - conflationMsg = this.msg; + conflationMsg = msg; } - this.stats.endMsgSerialization(this.serStartTime); - for (Iterator it = this.cons.iterator(); it.hasNext();) { - Connection con = (Connection) it.next(); + stats.endMsgSerialization(serStartTime); + for (Iterator<Connection> it = cons.iterator(); it.hasNext();) { + Connection con = it.next(); try { - con.sendPreserialized(this.buffer, - lastFlushForMessage && this.msg.containsRegionContentChange(), conflationMsg); + con.sendPreserialized(buffer, + lastFlushForMessage && msg.containsRegionContentChange(), conflationMsg); } catch (IOException ex) { it.remove(); - if (this.ce == null) { - this.ce = new ConnectExceptions(); + if (ce == null) { + ce = new ConnectExceptions(); } - this.ce.addFailure(con.getRemoteAddress(), ex); + ce.addFailure(con.getRemoteAddress(), ex); con.closeForReconnect( String.format("closing due to %s", "IOException")); } catch (ConnectionException ex) { it.remove(); - if (this.ce == null) { - this.ce = new ConnectExceptions(); + if (ce == null) { + ce = new ConnectExceptions(); } - this.ce.addFailure(con.getRemoteAddress(), ex); + ce.addFailure(con.getRemoteAddress(), ex); con.closeForReconnect( String.format("closing due to %s", "ConnectionException")); } - this.buffer.rewind(); + buffer.rewind(); } startSerialization(); - this.buffer.clear(); - this.buffer.position(Connection.MSG_HEADER_BYTES); + buffer.clear(); + buffer.position(Connection.MSG_HEADER_BYTES); } @VisibleForTesting @@ -361,12 +350,11 @@ public class MsgStreamer extends OutputStream @Override public void close() throws IOException { try { - if (this.startedSerializingMsg && !this.doneWritingMsg) { + if (startedSerializingMsg && !doneWritingMsg) { // if we wrote any bytes on the cnxs then we need to close them // since they have been corrupted by a partial serialization. - if (this.flushedBytes > 0) { - for (Iterator it = this.cons.iterator(); it.hasNext();) { - Connection con = (Connection) it.next(); + if (flushedBytes > 0) { + for (Connection con : cons) { con.closeForReconnect("Message serialization could not complete"); } } @@ -378,20 +366,17 @@ public class MsgStreamer extends OutputStream /** override OutputStream's write() */ @Override - public void write(byte[] source, int offset, int len) { - // if (logger.isTraceEnabled()) { - // logger.trace(" bytes={} offset={} len={}", source, offset, len); - // } - if (this.overflowBuf != null) { - this.overflowBuf.write(source, offset, len); + public void write(byte @NotNull [] source, int offset, int len) { + if (overflowBuf != null) { + overflowBuf.write(source, offset, len); return; } while (len > 0) { - int remainingSpace = this.buffer.capacity() - this.buffer.position(); + int remainingSpace = buffer.capacity() - buffer.position(); if (remainingSpace == 0) { realFlush(false); - if (this.overflowBuf != null) { - this.overflowBuf.write(source, offset, len); + if (overflowBuf != null) { + overflowBuf.write(source, offset, len); return; } } else { @@ -399,7 +384,7 @@ public class MsgStreamer extends OutputStream if (len < chunkSize) { chunkSize = len; } - this.buffer.put(source, offset, chunkSize); + buffer.put(source, offset, chunkSize); offset += chunkSize; len -= chunkSize; } @@ -408,20 +393,17 @@ public class MsgStreamer extends OutputStream @Override public void write(ByteBuffer bb) { - // if (logger.isTraceEnabled()) { - // logger.trace(" bytes={} offset={} len={}", source, offset, len); - // } - if (this.overflowBuf != null) { - this.overflowBuf.write(bb); + if (overflowBuf != null) { + overflowBuf.write(bb); return; } int len = bb.remaining(); while (len > 0) { - int remainingSpace = this.buffer.capacity() - this.buffer.position(); + int remainingSpace = buffer.capacity() - buffer.position(); if (remainingSpace == 0) { realFlush(false); - if (this.overflowBuf != null) { - this.overflowBuf.write(bb); + if (overflowBuf != null) { + overflowBuf.write(bb); return; } } else { @@ -431,7 +413,7 @@ public class MsgStreamer extends OutputStream } int oldLimit = bb.limit(); bb.limit(bb.position() + chunkSize); - this.buffer.put(bb); + buffer.put(bb); bb.limit(oldLimit); len -= chunkSize; } @@ -442,12 +424,12 @@ public class MsgStreamer extends OutputStream * write the header after the message has been written to the stream */ private void setMessageHeader() { - Assert.assertTrue(this.overflowBuf == null); + Assert.assertTrue(overflowBuf == null); Assert.assertTrue(!isOverflowMode()); // int processorType = this.msg.getProcessorType(); int msgType; - if (this.doneWritingMsg) { - if (this.normalMsg) { + if (doneWritingMsg) { + if (normalMsg) { msgType = Connection.NORMAL_MSG_TYPE; } else { msgType = Connection.END_CHUNKED_MSG_TYPE; @@ -458,17 +440,17 @@ public class MsgStreamer extends OutputStream } else { msgType = Connection.CHUNKED_MSG_TYPE; } - if (!this.normalMsg) { - if (this.msgId == MsgIdGenerator.NO_MSG_ID) { - this.msgId = MsgIdGenerator.obtain(); + if (!normalMsg) { + if (msgId == MsgIdGenerator.NO_MSG_ID) { + msgId = MsgIdGenerator.obtain(); } } - this.buffer.putInt(Connection.MSG_HEADER_SIZE_OFFSET, - Connection.calcHdrSize(this.buffer.limit() - Connection.MSG_HEADER_BYTES)); - this.buffer.put(Connection.MSG_HEADER_TYPE_OFFSET, (byte) (msgType & 0xff)); - this.buffer.putShort(Connection.MSG_HEADER_ID_OFFSET, this.msgId); - this.buffer.position(0); + buffer.putInt(Connection.MSG_HEADER_SIZE_OFFSET, + Connection.calcHdrSize(buffer.limit() - Connection.MSG_HEADER_BYTES)); + buffer.put(Connection.MSG_HEADER_TYPE_OFFSET, (byte) (msgType & 0xff)); + buffer.putShort(Connection.MSG_HEADER_ID_OFFSET, msgId); + buffer.position(0); } // DataOutput methods @@ -523,11 +505,11 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" short={}", v); ensureCapacity(2); - if (this.overflowBuf != null) { - this.overflowBuf.writeShort(v); + if (overflowBuf != null) { + overflowBuf.writeShort(v); return; } - this.buffer.putShort((short) (v & 0xffff)); + buffer.putShort((short) (v & 0xffff)); } /** @@ -553,11 +535,11 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" char={}", v); ensureCapacity(2); - if (this.overflowBuf != null) { - this.overflowBuf.writeChar(v); + if (overflowBuf != null) { + overflowBuf.writeChar(v); return; } - this.buffer.putChar((char) v); + buffer.putChar((char) v); } /** @@ -584,11 +566,11 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" int={}", v); ensureCapacity(4); - if (this.overflowBuf != null) { - this.overflowBuf.writeInt(v); + if (overflowBuf != null) { + overflowBuf.writeInt(v); return; } - this.buffer.putInt(v); + buffer.putInt(v); } /** @@ -619,11 +601,11 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" long={}", v); ensureCapacity(8); - if (this.overflowBuf != null) { - this.overflowBuf.writeLong(v); + if (overflowBuf != null) { + overflowBuf.writeLong(v); return; } - this.buffer.putLong(v); + buffer.putLong(v); } /** @@ -641,11 +623,11 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" float={}", v); ensureCapacity(4); - if (this.overflowBuf != null) { - this.overflowBuf.writeFloat(v); + if (overflowBuf != null) { + overflowBuf.writeFloat(v); return; } - this.buffer.putFloat(v); + buffer.putFloat(v); } /** @@ -663,11 +645,11 @@ public class MsgStreamer extends OutputStream // if (logger.isTraceEnabled()) logger.trace(" double={}", v); ensureCapacity(8); - if (this.overflowBuf != null) { - this.overflowBuf.writeDouble(v); + if (overflowBuf != null) { + overflowBuf.writeDouble(v); return; } - this.buffer.putDouble(v); + buffer.putDouble(v); } /** @@ -684,11 +666,11 @@ public class MsgStreamer extends OutputStream * @param str the string of bytes to be written. */ @Override - public void writeBytes(String str) { + public void writeBytes(@NotNull String str) { // if (logger.isTraceEnabled()) logger.trace(" bytes={}", str); - if (this.overflowBuf != null) { - this.overflowBuf.writeBytes(str); + if (overflowBuf != null) { + overflowBuf.writeBytes(str); return; } int strlen = str.length(); @@ -710,21 +692,21 @@ public class MsgStreamer extends OutputStream * @param s the string value to be written. */ @Override - public void writeChars(String s) { + public void writeChars(@NotNull String s) { // if (logger.isTraceEnabled()) logger.trace(" chars={}", s); - if (this.overflowBuf != null) { - this.overflowBuf.writeChars(s); + if (overflowBuf != null) { + overflowBuf.writeChars(s); return; } int len = s.length(); int offset = 0; while (len > 0) { - int remainingCharSpace = (this.buffer.capacity() - this.buffer.position()) / 2; + int remainingCharSpace = (buffer.capacity() - buffer.position()) / 2; if (remainingCharSpace == 0) { realFlush(false); - if (this.overflowBuf != null) { - this.overflowBuf.writeChars(s.substring(offset)); + if (overflowBuf != null) { + overflowBuf.writeChars(s.substring(offset)); return; } } else { @@ -733,7 +715,7 @@ public class MsgStreamer extends OutputStream chunkSize = len; } for (int i = 0; i < chunkSize; i++) { - this.buffer.putChar(s.charAt(offset + i)); + buffer.putChar(s.charAt(offset + i)); } offset += chunkSize; len -= chunkSize; @@ -800,11 +782,11 @@ public class MsgStreamer extends OutputStream * @exception IOException if an I/O error occurs. */ @Override - public void writeUTF(String str) throws IOException { + public void writeUTF(@NotNull String str) throws IOException { // if (logger.isTraceEnabled()) logger.trace(" utf={}", str); - if (this.overflowBuf != null) { - this.overflowBuf.writeUTF(str); + if (overflowBuf != null) { + overflowBuf.writeUTF(str); return; } if (ASCII_STRINGS) { @@ -822,11 +804,11 @@ public class MsgStreamer extends OutputStream writeShort(len); int offset = 0; while (len > 0) { - int remainingSpace = this.buffer.capacity() - this.buffer.position(); + int remainingSpace = buffer.capacity() - buffer.position(); if (remainingSpace == 0) { realFlush(false); - if (this.overflowBuf != null) { - this.overflowBuf.write(str.substring(offset).getBytes()); + if (overflowBuf != null) { + overflowBuf.write(str.substring(offset).getBytes()); return; } } else { @@ -835,7 +817,7 @@ public class MsgStreamer extends OutputStream chunkSize = len; } for (int i = 0; i < chunkSize; i++) { - this.buffer.put((byte) str.charAt(offset + i)); + buffer.put((byte) str.charAt(offset + i)); } offset += chunkSize; len -= chunkSize; @@ -849,7 +831,7 @@ public class MsgStreamer extends OutputStream throw new UTFDataFormatException(); } { - int remainingSpace = this.buffer.capacity() - this.buffer.position(); + int remainingSpace = buffer.capacity() - buffer.position(); if (remainingSpace >= ((strlen * 3) + 2)) { // we have plenty of room to do this with one pass directly into the buffer writeQuickFullUTF(str, strlen); @@ -878,10 +860,10 @@ public class MsgStreamer extends OutputStream } else if (c > 0x07FF) { writeByte((byte) (0xE0 | ((c >> 12) & 0x0F))); writeByte((byte) (0x80 | ((c >> 6) & 0x3F))); - writeByte((byte) (0x80 | ((c >> 0) & 0x3F))); + writeByte((byte) (0x80 | ((c) & 0x3F))); } else { writeByte((byte) (0xC0 | ((c >> 6) & 0x1F))); - writeByte((byte) (0x80 | ((c >> 0) & 0x3F))); + writeByte((byte) (0x80 | ((c) & 0x3F))); } } } @@ -890,29 +872,29 @@ public class MsgStreamer extends OutputStream * Used when we know the max size will fit in the current buffer. */ private void writeQuickFullUTF(String str, int strlen) throws IOException { - int utfSizeIdx = this.buffer.position(); + int utfSizeIdx = buffer.position(); // skip bytes reserved for length - this.buffer.position(utfSizeIdx + 2); + buffer.position(utfSizeIdx + 2); for (int i = 0; i < strlen; i++) { int c = str.charAt(i); if ((c >= 0x0001) && (c <= 0x007F)) { - this.buffer.put((byte) c); + buffer.put((byte) c); } else if (c > 0x07FF) { - this.buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F))); - this.buffer.put((byte) (0x80 | ((c >> 6) & 0x3F))); - this.buffer.put((byte) (0x80 | ((c >> 0) & 0x3F))); + buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F))); + buffer.put((byte) (0x80 | ((c >> 6) & 0x3F))); + buffer.put((byte) (0x80 | ((c) & 0x3F))); } else { - this.buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F))); - this.buffer.put((byte) (0x80 | ((c >> 0) & 0x3F))); + buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F))); + buffer.put((byte) (0x80 | ((c) & 0x3F))); } } - int utflen = this.buffer.position() - (utfSizeIdx + 2); + int utflen = buffer.position() - (utfSizeIdx + 2); if (utflen > 65535) { // act as if we wrote nothing to this buffer - this.buffer.position(utfSizeIdx); + buffer.position(utfSizeIdx); throw new UTFDataFormatException(); } - this.buffer.putShort(utfSizeIdx, (short) utflen); + buffer.putShort(utfSizeIdx, (short) utflen); } /** @@ -932,19 +914,19 @@ public class MsgStreamer extends OutputStream other.rewind(); return; } - if (this.overflowBuf != null) { - this.overflowBuf.writeAsSerializedByteArray(v); + if (overflowBuf != null) { + overflowBuf.writeAsSerializedByteArray(v); return; } if (isOverflowMode()) { // we must have recursed which is now allowed to fix bug 38194 - int remainingSpace = this.buffer.capacity() - this.buffer.position(); + int remainingSpace = buffer.capacity() - buffer.position(); if (remainingSpace < 5) { // we don't even have room to write the length field so just create // the overflowBuf - this.overflowBuf = new HeapDataOutputStream( - this.buffer.capacity() - Connection.MSG_HEADER_BYTES, KnownVersion.CURRENT); - this.overflowBuf.writeAsSerializedByteArray(v); + overflowBuf = new HeapDataOutputStream( + buffer.capacity() - Connection.MSG_HEADER_BYTES, KnownVersion.CURRENT); + overflowBuf.writeAsSerializedByteArray(v); return; } } else { @@ -953,26 +935,23 @@ public class MsgStreamer extends OutputStream * object. I pulled 1024 as the average out of thin air. */ } - int lengthPos = this.buffer.position(); - this.buffer.position(lengthPos + 5); + int lengthPos = buffer.position(); + buffer.position(lengthPos + 5); enableOverflowMode(); boolean finished = false; try { try { DataSerializer.writeObject(v, this); } catch (IOException e) { - RuntimeException e2 = new IllegalArgumentException( - "An Exception was thrown while serializing."); - e2.initCause(e); - throw e2; + throw new IllegalArgumentException("An Exception was thrown while serializing.", e); } - int baLength = this.buffer.position() - (lengthPos + 5); - HeapDataOutputStream overBuf = this.overflowBuf; + int baLength = buffer.position() - (lengthPos + 5); + HeapDataOutputStream overBuf = overflowBuf; if (overBuf != null) { baLength += overBuf.size(); } - this.buffer.put(lengthPos, StaticSerialization.INT_ARRAY_LEN); - this.buffer.putInt(lengthPos + 1, baLength); + buffer.put(lengthPos, StaticSerialization.INT_ARRAY_LEN); + buffer.putInt(lengthPos + 1, baLength); disableOverflowMode(); finished = true; if (overBuf != null && !isOverflowMode()) { @@ -981,7 +960,7 @@ public class MsgStreamer extends OutputStream } finally { if (!finished) { // reset buffer and act as if we did nothing - this.buffer.position(lengthPos); + buffer.position(lengthPos); disableOverflowMode(); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java index 552c028..191d19d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java @@ -32,25 +32,20 @@ class VersionedMsgStreamer extends MsgStreamer implements VersionedDataStream { private final KnownVersion version; - VersionedMsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats, + VersionedMsgStreamer(List<Connection> cons, DistributionMessage msg, boolean directReply, + DMStats stats, BufferPool bufferPool, int sendBufferSize, KnownVersion version) { super(cons, msg, directReply, stats, sendBufferSize, bufferPool); this.version = version; } - /** - * {@inheritDoc} - */ @Override public KnownVersion getVersion() { - return this.version; + return version; } - /** - * {@inheritDoc} - */ @Override public String toString() { - return super.toString() + " (" + this.version + ')'; + return super.toString() + " (" + version + ')'; } }
