This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4803096 GEODE-6928 peer-to-peer SSL stream corruption with
conserve-sockets=false
4803096 is described below
commit 48030961f82f91360d8c88f19516758ba0d6affe
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Mon Jul 1 13:00:21 2019 -0700
GEODE-6928 peer-to-peer SSL stream corruption with conserve-sockets=false
Modified the NioSslEngine to not modify the decrypted SSL buffer after
reading a direct-ack response. This allows the readFully method to
correctly see what bytes have already been consumed and correctly
compact the buffer for subsequent reads, if necessary.
The cluster communication test is modified to check for aborted
connections created (retries) during operation distribution. Without the
fix for the problem this check would fail.
---
.../org/apache/geode/ClusterCommunicationsDUnitTest.java | 14 +++++++++++---
.../org/apache/geode/distributed/internal/DMStats.java | 3 +++
.../geode/distributed/internal/DistributionStats.java | 5 +++++
.../distributed/internal/LonerDistributionManager.java | 5 +++++
.../geode/internal/cache/AbstractUpdateOperation.java | 2 +-
.../main/java/org/apache/geode/internal/net/NioFilter.java | 7 +++++++
.../java/org/apache/geode/internal/net/NioSslEngine.java | 7 +++++++
.../main/java/org/apache/geode/internal/tcp/MsgReader.java | 8 +-------
8 files changed, 40 insertions(+), 11 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 4d7bb23..eca86ed 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -66,6 +66,7 @@ import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -143,7 +144,9 @@ public class ClusterCommunicationsDUnitTest implements
Serializable {
for (int i = 1; i <= NUM_SERVERS; i++) {
verifyCreatedEntry(getVM(i));
}
- performUpdate(getVM(1));
+ for (int iteration = 1; iteration < 6; iteration++) {
+ performUpdate(getVM(1));
+ }
for (int i = 1; i <= NUM_SERVERS; i++) {
verifyUpdatedEntry(getVM(i));
}
@@ -239,8 +242,13 @@ public class ClusterCommunicationsDUnitTest implements
Serializable {
}
private void performUpdate(VM memberVM) {
- memberVM.invoke("perform update", () -> cache
- .getRegion(regionName).put("testKey", "updatedTestValue"));
+ memberVM.invoke("perform update", () -> {
+ DMStats stats = ((InternalDistributedSystem)
cache.getDistributedSystem())
+ .getDistributionManager().getStats();
+ int reconnectAttempts = stats.getReconnectAttempts();
+ cache.getRegion(regionName).put("testKey", "updatedTestValue");
+ assertThat(stats.getReconnectAttempts()).isEqualTo(reconnectAttempts);
+ });
}
private void performCreateWithLargeValue(VM memberVM) {
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
index 410d74f..f91bf55 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
@@ -297,6 +297,9 @@ public interface DMStats {
*/
void incReconnectAttempts();
+
+ int getReconnectAttempts();
+
/**
* @since GemFire 4.1
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index f644209..9d7836c 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -1657,6 +1657,11 @@ public class DistributionStats implements DMStats {
}
@Override
+ public int getReconnectAttempts() {
+ return stats.getInt(reconnectAttemptsId);
+ }
+
+ @Override
public void incLostLease() {
stats.incInt(lostConnectionLeaseId, 1);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index a676a7e..248ff96 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -553,6 +553,11 @@ public class LonerDistributionManager implements
DistributionManager {
public void incReconnectAttempts() {}
@Override
+ public int getReconnectAttempts() {
+ return 0;
+ }
+
+ @Override
public void incLostLease() {}
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index e467673..96b03b0 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -135,7 +135,7 @@ public abstract class AbstractUpdateOperation extends
DistributedCacheOperation
boolean doUpdate = true; // start with assumption we have key and need
value
if (shouldDoRemoteCreate(rgn, ev)) {
if (logger.isDebugEnabled()) {
- logger.debug("doPutOrCreate: attempting to create entry");
+ logger.debug("doPutOrCreate: attempting to update or create entry");
}
final long startPut = CachePerfStats.getStatTime();
final boolean isBucket = rgn.isUsedForPartitionedRegionBucket();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 8e41ef1..01556dc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -57,6 +57,13 @@ public interface NioFilter {
throws IOException;
/**
+ * When done reading a direct ack message invoke this method
+ */
+ default void doneReadingDirectAck(ByteBuffer unwrappedBuffer) {
+ doneReading(unwrappedBuffer);
+ }
+
+ /**
* You must invoke this when done reading from the unwrapped buffer
*/
default void doneReading(ByteBuffer unwrappedBuffer) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 9bf969d..09472d7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -369,6 +369,13 @@ public class NioSslEngine implements NioFilter {
return peerAppData;
}
+ @Override
+ public void doneReadingDirectAck(ByteBuffer unwrappedBuffer) {
+ // nothing needs to be done - the next direct-ack message will be
+ // read into the same buffer and compaction will be done during
+ // read-operations
+ }
+
@Override
public void close(SocketChannel socketChannel) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 0a33428..789c34b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -58,9 +58,6 @@ public class MsgReader {
Assert.assertTrue(unwrappedBuffer.remaining() >=
Connection.MSG_HEADER_BYTES);
- int position = unwrappedBuffer.position();
- int limit = unwrappedBuffer.limit();
-
try {
int nioMessageLength = unwrappedBuffer.getInt();
/* nioMessageVersion = */
@@ -94,12 +91,9 @@ public class MsgReader {
Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
this.getStats().incMessagesBeingReceived(true, header.messageLength);
long startSer = this.getStats().startMsgDeserialization();
- int position = nioInputBuffer.position();
- int limit = nioInputBuffer.limit();
try {
byteBufferInputStream.setBuffer(nioInputBuffer);
ReplyProcessor21.initMessageRPId();
- // dumpState("readMessage ready to deserialize", null, nioInputBuffer,
position, limit);
return (DistributionMessage)
InternalDataSerializer.readDSFID(byteBufferInputStream);
} catch (RuntimeException e) {
throw e;
@@ -108,7 +102,7 @@ public class MsgReader {
} finally {
this.getStats().endMsgDeserialization(startSer);
this.getStats().decMessagesBeingReceived(header.messageLength);
- ioFilter.doneReading(nioInputBuffer);
+ ioFilter.doneReadingDirectAck(nioInputBuffer);
}
}