This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c919e3fff0c59c2e831350e98f3345a9eec3630e
Author: Jacob Barrett <[email protected]>
AuthorDate: Fri May 21 12:34:55 2021 -0700

    GEODE-6588: Cleanup Connection
---
 .../org/apache/geode/internal/tcp/Connection.java  |  50 ++-
 .../geode/internal/tcp/DirectReplySender.java      |  20 +-
 .../org/apache/geode/internal/tcp/MsgStreamer.java | 337 ++++++++++-----------
 .../geode/internal/tcp/VersionedMsgStreamer.java   |  13 +-
 4 files changed, 193 insertions(+), 227 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 8969ab5..bbb6ce3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -241,14 +241,14 @@ public class Connection implements Runnable {
    * Maps ConflatedKey instances to ConflatedKey instance. Note that even 
though the key and value
    * for an entry is the map will always be "equal" they will not always be 
"==".
    */
-  private final Map conflatedKeys = new HashMap();
+  private final Map<ConflationKey, ConflationKey> conflatedKeys = new 
HashMap<>();
 
   /**
    * NOTE: LinkedBlockingQueue has a bug in which removes from the queue
    * cause future offer to increase the size without adding anything to the 
queue.
    * So I've changed from this backport class to a java.util.LinkedList
    */
-  private final LinkedList outgoingQueue = new LinkedList();
+  private final LinkedList<Object> outgoingQueue = new LinkedList<>();
 
   /**
    * Number of bytes in the outgoingQueue. Used to control capacity.
@@ -311,7 +311,7 @@ public class Connection implements Runnable {
    * other connections participating in the current transmission. we notify 
them if ackSATimeout
    * expires to keep all members from generating alerts when only one is slow
    */
-  private List ackConnectionGroup;
+  private List<Connection> ackConnectionGroup;
 
   /** name of thread that we're currently performing an operation in (may be 
null) */
   private String ackThreadName;
@@ -349,7 +349,7 @@ public class Connection implements Runnable {
   /**
    * used to map a msgId to a MsgDestreamer which are used for destreaming 
chunked messages
    */
-  private HashMap destreamerMap;
+  private HashMap<Short, MsgDestreamer> destreamerMap;
 
   private boolean directAck;
 
@@ -1139,8 +1139,8 @@ public class Connection implements Runnable {
   }
 
   private void setRemoteAddr(InternalDistributedMember m) {
-    this.remoteAddr = this.owner.getDM().getCanonicalId(m);
-    Membership<InternalDistributedMember> mgr = this.conduit.getMembership();
+    remoteAddr = owner.getDM().getCanonicalId(m);
+    Membership<InternalDistributedMember> mgr = conduit.getMembership();
     mgr.addSurpriseMember(m);
   }
 
@@ -1840,9 +1840,8 @@ public class Connection implements Runnable {
         idleMsgDestreamer = null;
       }
       if (destreamerMap != null) {
-        for (Object o : destreamerMap.values()) {
-          MsgDestreamer md = (MsgDestreamer) o;
-          md.close();
+        for (MsgDestreamer msgDestreamer : destreamerMap.values()) {
+          msgDestreamer.close();
         }
         destreamerMap = null;
       }
@@ -1852,10 +1851,10 @@ public class Connection implements Runnable {
   private MsgDestreamer obtainMsgDestreamer(short msgId, final KnownVersion v) 
{
     synchronized (destreamerLock) {
       if (destreamerMap == null) {
-        destreamerMap = new HashMap();
+        destreamerMap = new HashMap<>();
       }
       Short key = msgId;
-      MsgDestreamer result = (MsgDestreamer) destreamerMap.get(key);
+      MsgDestreamer result = destreamerMap.get(key);
       if (result == null) {
         result = idleMsgDestreamer;
         if (result != null) {
@@ -1937,7 +1936,7 @@ public class Connection implements Runnable {
    * false then "release" the connection.
    */
   public void setInUse(boolean use, long startTime, long ackWaitThreshold, 
long ackSAThreshold,
-      List connectionGroup) {
+      List<Connection> connectionGroup) {
     // just do the following; EVEN if the connection has been closed
     synchronized (this) {
       if (use && (ackWaitThreshold > 0 || ackSAThreshold > 0)) {
@@ -2016,13 +2015,12 @@ public class Connection implements Runnable {
               }
             }
           }
-          List group = ackConnectionGroup;
+          List<Connection> group = ackConnectionGroup;
           if (sentAlert && group != null) {
             // since transmission and ack-receipt are performed serially, we 
don't want to complain
             // about all receivers out just because one was slow. We therefore 
reset the time stamps
             // and give others more time
-            for (Object o : group) {
-              Connection con = (Connection) o;
+            for (Connection con : group) {
               if (con != Connection.this) {
                 con.transmissionStartTime += con.ackSATimeout;
               }
@@ -2115,12 +2113,11 @@ public class Connection implements Runnable {
         if (ck != null) {
           if (ck.allowsConflation()) {
             objToQueue = ck;
-            Object oldValue = conflatedKeys.put(ck, ck);
+            ConflationKey oldValue = conflatedKeys.put(ck, ck);
             if (oldValue != null) {
-              ConflationKey oldck = (ConflationKey) oldValue;
-              ByteBuffer oldBuffer = oldck.getBuffer();
+              ByteBuffer oldBuffer = oldValue.getBuffer();
               // need to always do this to allow old buffer to be gc'd
-              oldck.setBuffer(null);
+              oldValue.setBuffer(null);
 
               // remove the conflated key from current spot in queue
 
@@ -2133,7 +2130,7 @@ public class Connection implements Runnable {
               // and if it has the same identity of our last thing then
               // remove it
 
-              if (outgoingQueue.getLast() == oldck) {
+              if (outgoingQueue.getLast() == oldValue) {
                 outgoingQueue.removeLast();
               }
               int oldBytes = oldBuffer.remaining();
@@ -2453,12 +2450,10 @@ public class Connection implements Runnable {
     if (!preserveOrder) {
       return true;
     }
+
     // or the receiver does not allow queuing
-    if (asyncDistributionTimeout == 0) {
-      return true;
-    }
     // OTHERWISE return false and let caller send async
-    return false;
+    return asyncDistributionTimeout == 0;
   }
 
   private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean 
forceAsync,
@@ -2692,7 +2687,7 @@ public class Connection implements Runnable {
       int len;
 
       // (we have to lock here to protect between reading header and message 
body)
-      try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) {
+      try (final ByteBufferSharing ignored = ioFilter.getUnwrappedBuffer()) {
         Header header = msgReader.readHeader();
 
         if (header.getMessageType() == NORMAL_MSG_TYPE) {
@@ -2892,8 +2887,7 @@ public class Connection implements Runnable {
       remoteVersion = Versioning.getKnownVersionOrDefault(
           Versioning.getVersion(VersioningIO.readOrdinal(dis)),
           null);
-      int dominoNumber = 0;
-      dominoNumber = dis.readInt();
+      int dominoNumber = dis.readInt();
       if (sharedResource) {
         dominoNumber = 0;
       }
@@ -3323,7 +3317,7 @@ public class Connection implements Runnable {
   public String toString() {
     return remoteAddr + "(uid=" + uniqueId + ")"
         + (remoteVersion != null && remoteVersion != KnownVersion.CURRENT
-            ? "(v" + remoteVersion.toString() + ')' : "");
+            ? "(v" + remoteVersion + ')' : "");
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 78f53f7..e5b232e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -14,9 +14,10 @@
  */
 package org.apache.geode.internal.tcp;
 
+import static java.util.Collections.singletonList;
+
 import java.io.IOException;
 import java.io.NotSerializableException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Set;
 
@@ -43,30 +44,27 @@ class DirectReplySender implements ReplySender {
   @Immutable
   private static final DMStats DUMMY_STATS = new DummyDMStats();
 
-  private final Connection conn;
+  private final Connection connection;
   private boolean sentReply = false;
 
   public DirectReplySender(Connection connection) {
-    this.conn = connection;
+    this.connection = connection;
   }
 
   @Override
   public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
-    Assert.assertTrue(!this.sentReply, "Trying to reply twice to a message");
+    Assert.assertTrue(!sentReply, "Trying to reply twice to a message");
     // Using an ArrayList, rather than Collections.singletonList here, because 
the MsgStreamer
     // mutates the list when it has exceptions.
 
-    // fix for bug #42199 - cancellation check
-    
this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
+    
connection.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
 
     if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
       logger.trace(LogMarker.DM_VERBOSE, "Sending a direct reply {} to {}", 
msg,
-          conn.getRemoteAddress());
+          connection.getRemoteAddress());
     }
-    ArrayList<Connection> conns = new ArrayList<Connection>(1);
-    conns.add(conn);
-    MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, 
DUMMY_STATS,
-        conn.getBufferPool());
+    MsgStreamer ms = (MsgStreamer) 
MsgStreamer.create(singletonList(connection), msg, false,
+        DUMMY_STATS, connection.getBufferPool());
     try {
       ms.writeMessage();
       ConnectExceptions ce = ms.getConnectExceptions();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 8e9afa8..e0c54b9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -25,6 +25,7 @@ import java.util.List;
 import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
 import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import org.jetbrains.annotations.NotNull;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
@@ -55,7 +56,7 @@ public class MsgStreamer extends OutputStream
   /**
    * List of connections to send this msg to.
    */
-  private final List<?> cons;
+  private final List<Connection> cons;
 
   private final BufferPool bufferPool;
 
@@ -95,10 +96,10 @@ public class MsgStreamer extends OutputStream
    * Called to free up resources used by this streamer after the streamer has 
produced its message.
    */
   protected void release() {
-    MsgIdGenerator.release(this.msgId);
-    this.buffer.clear();
-    this.overflowBuf = null;
-    bufferPool.releaseSenderBuffer(this.buffer);
+    MsgIdGenerator.release(msgId);
+    buffer.clear();
+    overflowBuf = null;
+    bufferPool.releaseSenderBuffer(buffer);
   }
 
   /**
@@ -107,7 +108,7 @@ public class MsgStreamer extends OutputStream
    */
   @Override
   public ConnectExceptions getConnectExceptions() {
-    return this.ce;
+    return ce;
   }
 
   /**
@@ -116,7 +117,7 @@ public class MsgStreamer extends OutputStream
    */
   @Override
   public List<?> getSentConnections() {
-    return this.cons;
+    return cons;
   }
 
   /**
@@ -125,16 +126,16 @@ public class MsgStreamer extends OutputStream
    * Note: This is no longer supposed to be called directly rather the {@link 
#create} method should
    * now be used.
    */
-  MsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, 
DMStats stats,
+  MsgStreamer(List<Connection> cons, DistributionMessage msg, boolean 
directReply, DMStats stats,
       int sendBufferSize, BufferPool bufferPool) {
     this.stats = stats;
     this.msg = msg;
     this.cons = cons;
     int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE);
-    this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
-    this.buffer.clear();
-    this.buffer.position(Connection.MSG_HEADER_BYTES);
-    this.msgId = MsgIdGenerator.NO_MSG_ID;
+    buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
+    buffer.clear();
+    buffer.position(Connection.MSG_HEADER_BYTES);
+    msgId = MsgIdGenerator.NO_MSG_ID;
     this.directReply = directReply;
     this.bufferPool = bufferPool;
     startSerialization();
@@ -145,31 +146,22 @@ public class MsgStreamer extends OutputStream
    * connections to remote nodes. This method can either return a single 
MsgStreamer object or a
    * List of MsgStreamer objects.
    */
-  public static BaseMsgStreamer create(List<?> cons, final DistributionMessage 
msg,
+  public static BaseMsgStreamer create(List<Connection> cons, final 
DistributionMessage msg,
       final boolean directReply, final DMStats stats, BufferPool bufferPool) {
-    final Connection firstCon = (Connection) cons.get(0);
+    final Connection firstCon = cons.get(0);
     // split into different versions if required
     KnownVersion version;
     final int numCons = cons.size();
     if (numCons > 1) {
-      Connection con;
-      Object2ObjectOpenHashMap versionToConnMap = null;
+      Object2ObjectOpenHashMap<KnownVersion, List<Connection>> 
versionToConnMap = null;
       int numVersioned = 0;
-      for (Object c : cons) {
-        con = (Connection) c;
+      for (Connection con : cons) {
         version = con.getRemoteVersion();
-        if (version != null
-            && KnownVersion.CURRENT_ORDINAL > version.ordinal()) {
+        if (version != null && KnownVersion.CURRENT_ORDINAL > 
version.ordinal()) {
           if (versionToConnMap == null) {
-            versionToConnMap = new Object2ObjectOpenHashMap();
+            versionToConnMap = new Object2ObjectOpenHashMap<>();
           }
-          @SuppressWarnings("unchecked")
-          ArrayList<Object> vcons = (ArrayList<Object>) 
versionToConnMap.get(version);
-          if (vcons == null) {
-            vcons = new ArrayList<Object>(numCons);
-            versionToConnMap.put(version, vcons);
-          }
-          vcons.add(con);
+          versionToConnMap.computeIfAbsent(version, k -> new 
ArrayList<>(numCons)).add(con);
           numVersioned++;
         }
       }
@@ -179,15 +171,13 @@ public class MsgStreamer extends OutputStream
       } else {
         // if there is a versioned stream created, then split remaining
         // connections to unversioned stream
-        final ArrayList<MsgStreamer> streamers =
-            new ArrayList<MsgStreamer>(versionToConnMap.size() + 1);
+        final ArrayList<MsgStreamer> streamers = new 
ArrayList<>(versionToConnMap.size() + 1);
         final int sendBufferSize = firstCon.getSendBufferSize();
         if (numCons > numVersioned) {
           // allocating list of numCons size so that as the result of
           // getSentConnections it may not need to be reallocted later
-          final ArrayList<Object> currentVersionConnections = new 
ArrayList<Object>(numCons);
-          for (Object c : cons) {
-            con = (Connection) c;
+          final List<Connection> currentVersionConnections = new 
ArrayList<>(numCons);
+          for (Connection con : cons) {
             version = con.getRemoteVersion();
             if (version == null || version.ordinal() >= 
KnownVersion.CURRENT_ORDINAL) {
               currentVersionConnections.add(con);
@@ -197,13 +187,13 @@ public class MsgStreamer extends OutputStream
               new MsgStreamer(currentVersionConnections, msg, directReply, 
stats, sendBufferSize,
                   bufferPool));
         }
-        for (ObjectIterator<Object2ObjectMap.Entry> itr =
+        for (ObjectIterator<Object2ObjectMap.Entry<KnownVersion, 
List<Connection>>> itr =
             versionToConnMap.object2ObjectEntrySet().fastIterator(); 
itr.hasNext();) {
-          Object2ObjectMap.Entry entry = itr.next();
-          Object ver = entry.getKey();
-          Object l = entry.getValue();
-          streamers.add(new VersionedMsgStreamer((List<?>) l, msg, 
directReply, stats,
-              bufferPool, sendBufferSize, (KnownVersion) ver));
+          Object2ObjectMap.Entry<KnownVersion, List<Connection>> entry = 
itr.next();
+          KnownVersion ver = entry.getKey();
+          List<Connection> l = entry.getValue();
+          streamers.add(new VersionedMsgStreamer(l, msg, directReply, stats,
+              bufferPool, sendBufferSize, ver));
         }
         return new MsgStreamerList(streamers);
       }
@@ -224,8 +214,7 @@ public class MsgStreamer extends OutputStream
    */
   @Override
   public void reserveConnections(long startTime, long ackTimeout, long 
ackSDTimeout) {
-    for (Iterator it = cons.iterator(); it.hasNext();) {
-      Connection con = (Connection) it.next();
+    for (final Connection con : cons) {
       con.setInUse(true, startTime, ackTimeout, ackSDTimeout, cons);
       if (ackTimeout > 0) {
         con.scheduleAckTimeouts();
@@ -234,7 +223,7 @@ public class MsgStreamer extends OutputStream
   }
 
   private void startSerialization() {
-    this.serStartTime = stats.startMsgSerialization();
+    serStartTime = stats.startMsgSerialization();
   }
 
   /**
@@ -245,15 +234,15 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(this.msg);
 
     try {
-      this.startedSerializingMsg = true;
-      InternalDataSerializer.writeDSFID(this.msg, this);
-      this.doneWritingMsg = true;
-      if (this.flushedBytes == 0) {
+      startedSerializingMsg = true;
+      InternalDataSerializer.writeDSFID(msg, this);
+      doneWritingMsg = true;
+      if (flushedBytes == 0) {
         // message fit in one chunk
-        this.normalMsg = true;
+        normalMsg = true;
       }
       realFlush(true);
-      return this.flushedBytes;
+      return flushedBytes;
     } finally {
       release();
     }
@@ -265,18 +254,18 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" byte={}", b);
 
     ensureCapacity(1);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.write(b);
+    if (overflowBuf != null) {
+      overflowBuf.write(b);
       return;
     }
-    this.buffer.put((byte) (b & 0xff));
+    buffer.put((byte) (b & 0xff));
   }
 
   private void ensureCapacity(int amount) {
-    if (this.overflowBuf != null) {
+    if (overflowBuf != null) {
       return;
     }
-    int remainingSpace = this.buffer.capacity() - this.buffer.position();
+    int remainingSpace = buffer.capacity() - buffer.position();
     if (amount > remainingSpace) {
       realFlush(false);
     }
@@ -292,65 +281,65 @@ public class MsgStreamer extends OutputStream
   private HeapDataOutputStream overflowBuf = null;
 
   private boolean isOverflowMode() {
-    return this.overflowMode > 0;
+    return overflowMode > 0;
   }
 
   private void enableOverflowMode() {
-    this.overflowMode++;
+    overflowMode++;
   }
 
   private void disableOverflowMode() {
-    this.overflowMode--;
+    overflowMode--;
     if (!isOverflowMode()) {
-      this.overflowBuf = null;
+      overflowBuf = null;
     }
   }
 
   public void realFlush(boolean lastFlushForMessage) {
     if (isOverflowMode()) {
-      if (this.overflowBuf == null) {
-        this.overflowBuf = new HeapDataOutputStream(
-            this.buffer.capacity() - Connection.MSG_HEADER_BYTES, 
KnownVersion.CURRENT);
+      if (overflowBuf == null) {
+        overflowBuf = new HeapDataOutputStream(
+            buffer.capacity() - Connection.MSG_HEADER_BYTES, 
KnownVersion.CURRENT);
       }
       return;
     }
-    this.buffer.flip();
+    buffer.flip();
     setMessageHeader();
-    final int serializedBytes = this.buffer.limit();
-    this.flushedBytes += serializedBytes;
+    final int serializedBytes = buffer.limit();
+    flushedBytes += serializedBytes;
     DistributionMessage conflationMsg = null;
-    if (this.normalMsg) {
+    if (normalMsg) {
       // we can't conflate chunked messages; this fixes bug 36633
-      conflationMsg = this.msg;
+      conflationMsg = msg;
     }
-    this.stats.endMsgSerialization(this.serStartTime);
-    for (Iterator it = this.cons.iterator(); it.hasNext();) {
-      Connection con = (Connection) it.next();
+    stats.endMsgSerialization(serStartTime);
+    for (Iterator<Connection> it = cons.iterator(); it.hasNext();) {
+      Connection con = it.next();
       try {
-        con.sendPreserialized(this.buffer,
-            lastFlushForMessage && this.msg.containsRegionContentChange(), 
conflationMsg);
+        con.sendPreserialized(buffer,
+            lastFlushForMessage && msg.containsRegionContentChange(), 
conflationMsg);
       } catch (IOException ex) {
         it.remove();
-        if (this.ce == null) {
-          this.ce = new ConnectExceptions();
+        if (ce == null) {
+          ce = new ConnectExceptions();
         }
-        this.ce.addFailure(con.getRemoteAddress(), ex);
+        ce.addFailure(con.getRemoteAddress(), ex);
         con.closeForReconnect(
             String.format("closing due to %s", "IOException"));
       } catch (ConnectionException ex) {
         it.remove();
-        if (this.ce == null) {
-          this.ce = new ConnectExceptions();
+        if (ce == null) {
+          ce = new ConnectExceptions();
         }
-        this.ce.addFailure(con.getRemoteAddress(), ex);
+        ce.addFailure(con.getRemoteAddress(), ex);
         con.closeForReconnect(
             String.format("closing due to %s", "ConnectionException"));
       }
-      this.buffer.rewind();
+      buffer.rewind();
     }
     startSerialization();
-    this.buffer.clear();
-    this.buffer.position(Connection.MSG_HEADER_BYTES);
+    buffer.clear();
+    buffer.position(Connection.MSG_HEADER_BYTES);
   }
 
   @VisibleForTesting
@@ -361,12 +350,11 @@ public class MsgStreamer extends OutputStream
   @Override
   public void close() throws IOException {
     try {
-      if (this.startedSerializingMsg && !this.doneWritingMsg) {
+      if (startedSerializingMsg && !doneWritingMsg) {
         // if we wrote any bytes on the cnxs then we need to close them
         // since they have been corrupted by a partial serialization.
-        if (this.flushedBytes > 0) {
-          for (Iterator it = this.cons.iterator(); it.hasNext();) {
-            Connection con = (Connection) it.next();
+        if (flushedBytes > 0) {
+          for (Connection con : cons) {
             con.closeForReconnect("Message serialization could not complete");
           }
         }
@@ -378,20 +366,17 @@ public class MsgStreamer extends OutputStream
 
   /** override OutputStream's write() */
   @Override
-  public void write(byte[] source, int offset, int len) {
-    // if (logger.isTraceEnabled()) {
-    // logger.trace(" bytes={} offset={} len={}", source, offset, len);
-    // }
-    if (this.overflowBuf != null) {
-      this.overflowBuf.write(source, offset, len);
+  public void write(byte @NotNull [] source, int offset, int len) {
+    if (overflowBuf != null) {
+      overflowBuf.write(source, offset, len);
       return;
     }
     while (len > 0) {
-      int remainingSpace = this.buffer.capacity() - this.buffer.position();
+      int remainingSpace = buffer.capacity() - buffer.position();
       if (remainingSpace == 0) {
         realFlush(false);
-        if (this.overflowBuf != null) {
-          this.overflowBuf.write(source, offset, len);
+        if (overflowBuf != null) {
+          overflowBuf.write(source, offset, len);
           return;
         }
       } else {
@@ -399,7 +384,7 @@ public class MsgStreamer extends OutputStream
         if (len < chunkSize) {
           chunkSize = len;
         }
-        this.buffer.put(source, offset, chunkSize);
+        buffer.put(source, offset, chunkSize);
         offset += chunkSize;
         len -= chunkSize;
       }
@@ -408,20 +393,17 @@ public class MsgStreamer extends OutputStream
 
   @Override
   public void write(ByteBuffer bb) {
-    // if (logger.isTraceEnabled()) {
-    // logger.trace(" bytes={} offset={} len={}", source, offset, len);
-    // }
-    if (this.overflowBuf != null) {
-      this.overflowBuf.write(bb);
+    if (overflowBuf != null) {
+      overflowBuf.write(bb);
       return;
     }
     int len = bb.remaining();
     while (len > 0) {
-      int remainingSpace = this.buffer.capacity() - this.buffer.position();
+      int remainingSpace = buffer.capacity() - buffer.position();
       if (remainingSpace == 0) {
         realFlush(false);
-        if (this.overflowBuf != null) {
-          this.overflowBuf.write(bb);
+        if (overflowBuf != null) {
+          overflowBuf.write(bb);
           return;
         }
       } else {
@@ -431,7 +413,7 @@ public class MsgStreamer extends OutputStream
         }
         int oldLimit = bb.limit();
         bb.limit(bb.position() + chunkSize);
-        this.buffer.put(bb);
+        buffer.put(bb);
         bb.limit(oldLimit);
         len -= chunkSize;
       }
@@ -442,12 +424,12 @@ public class MsgStreamer extends OutputStream
    * write the header after the message has been written to the stream
    */
   private void setMessageHeader() {
-    Assert.assertTrue(this.overflowBuf == null);
+    Assert.assertTrue(overflowBuf == null);
     Assert.assertTrue(!isOverflowMode());
     // int processorType = this.msg.getProcessorType();
     int msgType;
-    if (this.doneWritingMsg) {
-      if (this.normalMsg) {
+    if (doneWritingMsg) {
+      if (normalMsg) {
         msgType = Connection.NORMAL_MSG_TYPE;
       } else {
         msgType = Connection.END_CHUNKED_MSG_TYPE;
@@ -458,17 +440,17 @@ public class MsgStreamer extends OutputStream
     } else {
       msgType = Connection.CHUNKED_MSG_TYPE;
     }
-    if (!this.normalMsg) {
-      if (this.msgId == MsgIdGenerator.NO_MSG_ID) {
-        this.msgId = MsgIdGenerator.obtain();
+    if (!normalMsg) {
+      if (msgId == MsgIdGenerator.NO_MSG_ID) {
+        msgId = MsgIdGenerator.obtain();
       }
     }
 
-    this.buffer.putInt(Connection.MSG_HEADER_SIZE_OFFSET,
-        Connection.calcHdrSize(this.buffer.limit() - 
Connection.MSG_HEADER_BYTES));
-    this.buffer.put(Connection.MSG_HEADER_TYPE_OFFSET, (byte) (msgType & 
0xff));
-    this.buffer.putShort(Connection.MSG_HEADER_ID_OFFSET, this.msgId);
-    this.buffer.position(0);
+    buffer.putInt(Connection.MSG_HEADER_SIZE_OFFSET,
+        Connection.calcHdrSize(buffer.limit() - Connection.MSG_HEADER_BYTES));
+    buffer.put(Connection.MSG_HEADER_TYPE_OFFSET, (byte) (msgType & 0xff));
+    buffer.putShort(Connection.MSG_HEADER_ID_OFFSET, msgId);
+    buffer.position(0);
   }
 
   // DataOutput methods
@@ -523,11 +505,11 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" short={}", v);
 
     ensureCapacity(2);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeShort(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeShort(v);
       return;
     }
-    this.buffer.putShort((short) (v & 0xffff));
+    buffer.putShort((short) (v & 0xffff));
   }
 
   /**
@@ -553,11 +535,11 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" char={}", v);
 
     ensureCapacity(2);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeChar(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeChar(v);
       return;
     }
-    this.buffer.putChar((char) v);
+    buffer.putChar((char) v);
   }
 
   /**
@@ -584,11 +566,11 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" int={}", v);
 
     ensureCapacity(4);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeInt(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeInt(v);
       return;
     }
-    this.buffer.putInt(v);
+    buffer.putInt(v);
   }
 
   /**
@@ -619,11 +601,11 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" long={}", v);
 
     ensureCapacity(8);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeLong(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeLong(v);
       return;
     }
-    this.buffer.putLong(v);
+    buffer.putLong(v);
   }
 
   /**
@@ -641,11 +623,11 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" float={}", v);
 
     ensureCapacity(4);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeFloat(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeFloat(v);
       return;
     }
-    this.buffer.putFloat(v);
+    buffer.putFloat(v);
   }
 
   /**
@@ -663,11 +645,11 @@ public class MsgStreamer extends OutputStream
     // if (logger.isTraceEnabled()) logger.trace(" double={}", v);
 
     ensureCapacity(8);
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeDouble(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeDouble(v);
       return;
     }
-    this.buffer.putDouble(v);
+    buffer.putDouble(v);
   }
 
   /**
@@ -684,11 +666,11 @@ public class MsgStreamer extends OutputStream
    * @param str the string of bytes to be written.
    */
   @Override
-  public void writeBytes(String str) {
+  public void writeBytes(@NotNull String str) {
     // if (logger.isTraceEnabled()) logger.trace(" bytes={}", str);
 
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeBytes(str);
+    if (overflowBuf != null) {
+      overflowBuf.writeBytes(str);
       return;
     }
     int strlen = str.length();
@@ -710,21 +692,21 @@ public class MsgStreamer extends OutputStream
    * @param s the string value to be written.
    */
   @Override
-  public void writeChars(String s) {
+  public void writeChars(@NotNull String s) {
     // if (logger.isTraceEnabled()) logger.trace(" chars={}", s);
 
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeChars(s);
+    if (overflowBuf != null) {
+      overflowBuf.writeChars(s);
       return;
     }
     int len = s.length();
     int offset = 0;
     while (len > 0) {
-      int remainingCharSpace = (this.buffer.capacity() - 
this.buffer.position()) / 2;
+      int remainingCharSpace = (buffer.capacity() - buffer.position()) / 2;
       if (remainingCharSpace == 0) {
         realFlush(false);
-        if (this.overflowBuf != null) {
-          this.overflowBuf.writeChars(s.substring(offset));
+        if (overflowBuf != null) {
+          overflowBuf.writeChars(s.substring(offset));
           return;
         }
       } else {
@@ -733,7 +715,7 @@ public class MsgStreamer extends OutputStream
           chunkSize = len;
         }
         for (int i = 0; i < chunkSize; i++) {
-          this.buffer.putChar(s.charAt(offset + i));
+          buffer.putChar(s.charAt(offset + i));
         }
         offset += chunkSize;
         len -= chunkSize;
@@ -800,11 +782,11 @@ public class MsgStreamer extends OutputStream
    * @exception IOException if an I/O error occurs.
    */
   @Override
-  public void writeUTF(String str) throws IOException {
+  public void writeUTF(@NotNull String str) throws IOException {
     // if (logger.isTraceEnabled()) logger.trace(" utf={}", str);
 
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeUTF(str);
+    if (overflowBuf != null) {
+      overflowBuf.writeUTF(str);
       return;
     }
     if (ASCII_STRINGS) {
@@ -822,11 +804,11 @@ public class MsgStreamer extends OutputStream
     writeShort(len);
     int offset = 0;
     while (len > 0) {
-      int remainingSpace = this.buffer.capacity() - this.buffer.position();
+      int remainingSpace = buffer.capacity() - buffer.position();
       if (remainingSpace == 0) {
         realFlush(false);
-        if (this.overflowBuf != null) {
-          this.overflowBuf.write(str.substring(offset).getBytes());
+        if (overflowBuf != null) {
+          overflowBuf.write(str.substring(offset).getBytes());
           return;
         }
       } else {
@@ -835,7 +817,7 @@ public class MsgStreamer extends OutputStream
           chunkSize = len;
         }
         for (int i = 0; i < chunkSize; i++) {
-          this.buffer.put((byte) str.charAt(offset + i));
+          buffer.put((byte) str.charAt(offset + i));
         }
         offset += chunkSize;
         len -= chunkSize;
@@ -849,7 +831,7 @@ public class MsgStreamer extends OutputStream
       throw new UTFDataFormatException();
     }
     {
-      int remainingSpace = this.buffer.capacity() - this.buffer.position();
+      int remainingSpace = buffer.capacity() - buffer.position();
       if (remainingSpace >= ((strlen * 3) + 2)) {
         // we have plenty of room to do this with one pass directly into the 
buffer
         writeQuickFullUTF(str, strlen);
@@ -878,10 +860,10 @@ public class MsgStreamer extends OutputStream
       } else if (c > 0x07FF) {
         writeByte((byte) (0xE0 | ((c >> 12) & 0x0F)));
         writeByte((byte) (0x80 | ((c >> 6) & 0x3F)));
-        writeByte((byte) (0x80 | ((c >> 0) & 0x3F)));
+        writeByte((byte) (0x80 | ((c) & 0x3F)));
       } else {
         writeByte((byte) (0xC0 | ((c >> 6) & 0x1F)));
-        writeByte((byte) (0x80 | ((c >> 0) & 0x3F)));
+        writeByte((byte) (0x80 | ((c) & 0x3F)));
       }
     }
   }
@@ -890,29 +872,29 @@ public class MsgStreamer extends OutputStream
    * Used when we know the max size will fit in the current buffer.
    */
   private void writeQuickFullUTF(String str, int strlen) throws IOException {
-    int utfSizeIdx = this.buffer.position();
+    int utfSizeIdx = buffer.position();
     // skip bytes reserved for length
-    this.buffer.position(utfSizeIdx + 2);
+    buffer.position(utfSizeIdx + 2);
     for (int i = 0; i < strlen; i++) {
       int c = str.charAt(i);
       if ((c >= 0x0001) && (c <= 0x007F)) {
-        this.buffer.put((byte) c);
+        buffer.put((byte) c);
       } else if (c > 0x07FF) {
-        this.buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
-        this.buffer.put((byte) (0x80 | ((c >> 6) & 0x3F)));
-        this.buffer.put((byte) (0x80 | ((c >> 0) & 0x3F)));
+        buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
+        buffer.put((byte) (0x80 | ((c >> 6) & 0x3F)));
+        buffer.put((byte) (0x80 | ((c) & 0x3F)));
       } else {
-        this.buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
-        this.buffer.put((byte) (0x80 | ((c >> 0) & 0x3F)));
+        buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
+        buffer.put((byte) (0x80 | ((c) & 0x3F)));
       }
     }
-    int utflen = this.buffer.position() - (utfSizeIdx + 2);
+    int utflen = buffer.position() - (utfSizeIdx + 2);
     if (utflen > 65535) {
       // act as if we wrote nothing to this buffer
-      this.buffer.position(utfSizeIdx);
+      buffer.position(utfSizeIdx);
       throw new UTFDataFormatException();
     }
-    this.buffer.putShort(utfSizeIdx, (short) utflen);
+    buffer.putShort(utfSizeIdx, (short) utflen);
   }
 
   /**
@@ -932,19 +914,19 @@ public class MsgStreamer extends OutputStream
       other.rewind();
       return;
     }
-    if (this.overflowBuf != null) {
-      this.overflowBuf.writeAsSerializedByteArray(v);
+    if (overflowBuf != null) {
+      overflowBuf.writeAsSerializedByteArray(v);
       return;
     }
     if (isOverflowMode()) {
       // we must have recursed which is now allowed to fix bug 38194
-      int remainingSpace = this.buffer.capacity() - this.buffer.position();
+      int remainingSpace = buffer.capacity() - buffer.position();
       if (remainingSpace < 5) {
         // we don't even have room to write the length field so just create
         // the overflowBuf
-        this.overflowBuf = new HeapDataOutputStream(
-            this.buffer.capacity() - Connection.MSG_HEADER_BYTES, 
KnownVersion.CURRENT);
-        this.overflowBuf.writeAsSerializedByteArray(v);
+        overflowBuf = new HeapDataOutputStream(
+            buffer.capacity() - Connection.MSG_HEADER_BYTES, 
KnownVersion.CURRENT);
+        overflowBuf.writeAsSerializedByteArray(v);
         return;
       }
     } else {
@@ -953,26 +935,23 @@ public class MsgStreamer extends OutputStream
                                  * object. I pulled 1024 as the average out of 
thin air.
                                  */
     }
-    int lengthPos = this.buffer.position();
-    this.buffer.position(lengthPos + 5);
+    int lengthPos = buffer.position();
+    buffer.position(lengthPos + 5);
     enableOverflowMode();
     boolean finished = false;
     try {
       try {
         DataSerializer.writeObject(v, this);
       } catch (IOException e) {
-        RuntimeException e2 = new IllegalArgumentException(
-            "An Exception was thrown while serializing.");
-        e2.initCause(e);
-        throw e2;
+        throw new IllegalArgumentException("An Exception was thrown while 
serializing.", e);
       }
-      int baLength = this.buffer.position() - (lengthPos + 5);
-      HeapDataOutputStream overBuf = this.overflowBuf;
+      int baLength = buffer.position() - (lengthPos + 5);
+      HeapDataOutputStream overBuf = overflowBuf;
       if (overBuf != null) {
         baLength += overBuf.size();
       }
-      this.buffer.put(lengthPos, StaticSerialization.INT_ARRAY_LEN);
-      this.buffer.putInt(lengthPos + 1, baLength);
+      buffer.put(lengthPos, StaticSerialization.INT_ARRAY_LEN);
+      buffer.putInt(lengthPos + 1, baLength);
       disableOverflowMode();
       finished = true;
       if (overBuf != null && !isOverflowMode()) {
@@ -981,7 +960,7 @@ public class MsgStreamer extends OutputStream
     } finally {
       if (!finished) {
         // reset buffer and act as if we did nothing
-        this.buffer.position(lengthPos);
+        buffer.position(lengthPos);
         disableOverflowMode();
       }
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
index 552c028..191d19d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
@@ -32,25 +32,20 @@ class VersionedMsgStreamer extends MsgStreamer implements 
VersionedDataStream {
 
   private final KnownVersion version;
 
-  VersionedMsgStreamer(List<?> cons, DistributionMessage msg, boolean 
directReply, DMStats stats,
+  VersionedMsgStreamer(List<Connection> cons, DistributionMessage msg, boolean 
directReply,
+      DMStats stats,
       BufferPool bufferPool, int sendBufferSize, KnownVersion version) {
     super(cons, msg, directReply, stats, sendBufferSize, bufferPool);
     this.version = version;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public KnownVersion getVersion() {
-    return this.version;
+    return version;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public String toString() {
-    return super.toString() + " (" + this.version + ')';
+    return super.toString() + " (" + version + ')';
   }
 }

Reply via email to