This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 63e223d49ba50b7427a1fcbebbfacb08fe4e9ce1 Author: Jacob Barrett <[email protected]> AuthorDate: Thu Jan 21 19:54:46 2021 -0800 GEODE-8870: Removes GFE_65. --- .../cache/tier/sockets/BaseCommandQuery.java | 6 +- .../cache/tier/sockets/ChunkedMessage.java | 4 +- .../tier/sockets/ClientUpdateMessageImpl.java | 163 --------------------- .../cache/tier/sockets/ServerConnection.java | 6 +- .../tier/sockets/ServerSideHandshakeImpl.java | 2 +- .../internal/cache/tier/sockets/command/Put70.java | 26 ---- .../cache/tier/sockets/ServerConnectionTest.java | 14 +- .../geode/internal/serialization/KnownVersion.java | 7 - 8 files changed, 9 insertions(+), 219 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java index 7a5cb6e..b2933fd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java @@ -309,11 +309,7 @@ public abstract class BaseCommandQuery extends BaseCommand { } private boolean sendCqResultsWithKey(ServerConnection servConn) { - KnownVersion clientVersion = servConn.getClientVersion(); - if (clientVersion.isNotOlderThan(KnownVersion.GFE_65)) { - return true; - } - return false; + return true; } protected void sendCqResponse(int msgType, String msgStr, int txId, Throwable e, diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java index 0be09bf..793891d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java @@ -151,9 +151,7 @@ public class ChunkedMessage extends Message { public void setLastChunkAndNumParts(boolean lastChunk, int numParts) { setLastChunk(lastChunk); - if (this.serverConnection != null - && this.serverConnection.getClientVersion().isNotOlderThan(KnownVersion.GFE_65)) { - // we us e three bits for number of parts in last chunk byte + if (this.serverConnection != null) { // we us e three bits for number of parts in last chunk byte byte localLastChunk = (byte) (numParts << 5); this.lastChunk |= localLastChunk; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java index 2e33fde..6fc67a0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java @@ -373,8 +373,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N } if (clientVersion.isNotOlderThan(KnownVersion.GFE_70)) { message = getGFE70Message(proxy, serializedValue, conflation, clientVersion); - } else if (clientVersion.isNotOlderThan(KnownVersion.GFE_65)) { - message = getGFE65Message(proxy, serializedValue, conflation, clientVersion); } else { throw new IOException( "Unsupported client version for server-to-client message creation: " + clientVersion); @@ -383,167 +381,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N return message; } - private Message getGFE65Message(CacheClientProxy proxy, byte[] p_latestValue, - boolean conflation, KnownVersion clientVersion) throws IOException { - byte[] latestValue = p_latestValue; - Message message; - ClientProxyMembershipID proxyId = proxy.getProxyID(); - - // Add CQ info. - int cqMsgParts = 0; - boolean clientHasCq = _hasCqs && (getCqs(proxyId) != null); - - if (clientHasCq) { - cqMsgParts = (getCqs(proxyId).length * 2) + 1; - } - - if (isCreate() || isUpdate()) { - // Create or update event - if (_clientInterestListInv != null && _clientInterestListInv.contains(proxyId)) { - // Client is registered for invalidates. - if (cqMsgParts > 0) { - cqMsgParts++; // To store base operation type for CQ. - } - - message = new Message(6 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.LOCAL_INVALIDATE); - - // Add the region name - message.addStringPart(_regionName, true); - - // Add the key - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated it later - message.addStringOrObjPart(_keyOfInterest); - } else { - // Notify by subscription - send the value - message = new Message(8 + cqMsgParts, clientVersion); - - // Set message type - if (isCreate()) { - message.setMessageType(MessageType.LOCAL_CREATE); - - // Add the region name - message.addStringPart(_regionName, true); - - // Add the key - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated it later - message.addStringOrObjPart(_keyOfInterest); - - message.addObjPart(Boolean.FALSE); // NO delta - // Add the value (which has already been serialized) - message.addRawPart(latestValue, (_valueIsObject == 0x01)); - } else { - message.setMessageType(MessageType.LOCAL_UPDATE); - - // Add the region name - message.addStringPart(_regionName, true); - - // Add the key - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated it later - message.addStringOrObjPart(_keyOfInterest); - - if (deltaBytes != null && !conflation && !proxy.isMarkerEnqueued() - && !proxy.getRegionsWithEmptyDataPolicy().containsKey(_regionName)) { - message.addObjPart(Boolean.TRUE); - message.addBytesPart(deltaBytes); - proxy.getStatistics().incDeltaMessagesSent(); - } else { - message.addObjPart(Boolean.FALSE); - if (latestValue == null) { - if (!(_value instanceof byte[])) { - _value = CacheServerHelper.serialize(_value); - } - latestValue = (byte[]) _value; - } - // Add the value (which has already been serialized) - message.addRawPart(latestValue, (_valueIsObject == 0x01)); - } - } - } - - message.addObjPart(_callbackArgument); - message.addObjPart(isClientInterested(proxyId)); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - if (message.getMessageType() == MessageType.LOCAL_INVALIDATE) { - // in case of invalidate, set the region operation type. - message.addIntPart(isCreate() ? MessageType.LOCAL_CREATE : MessageType.LOCAL_UPDATE); - } - addCqsToMessage(proxyId, message); - } - } else if (isDestroy() || isInvalidate()) { - if (isDestroy()) { - message = new Message(6 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.LOCAL_DESTROY); - } else { - if (clientHasCq) { - cqMsgParts++;/* To store the region operation for CQ */ - } - message = new Message(6 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.LOCAL_INVALIDATE); - } - message.addStringPart(_regionName, true); - message.addStringOrObjPart(_keyOfInterest); - message.addObjPart(_callbackArgument); - message.addObjPart(isClientInterested(proxyId)); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - if (isInvalidate()) { - // This is to take care when invalidate message is getting sent - // to the Client. See the code for create/update operation. - message.addIntPart(MessageType.LOCAL_INVALIDATE); - } - addCqsToMessage(proxyId, message); - } - } else if (isDestroyRegion()) { - message = new Message(4 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.LOCAL_DESTROY_REGION); - message.addStringPart(_regionName, true); - message.addObjPart(_callbackArgument); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else if (isClearRegion()) { - message = new Message(4 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.CLEAR_REGION); - message.addStringPart(_regionName, true); - message.addObjPart(_callbackArgument); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else if (isInvalidateRegion()) { - message = new Message(4 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.INVALIDATE_REGION); - message.addStringPart(_regionName, true); - message.addObjPart(_callbackArgument); - - // Add CQ status. - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else { - throw new InternalGemFireError("Don't know what kind of message"); - } - - message.setTransactionId(0); - // Add the EventId since 5.1 (used to prevent duplicate events - // received on the client side after a failover) - message.addObjPart(_eventIdentifier); - return message; - } - - private Message getGFE70Message(CacheClientProxy proxy, byte[] p_latestValue, boolean conflation, KnownVersion clientVersion) throws IOException { byte[] latestValue = p_latestValue; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 9dfc9f2..e682805 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -362,8 +362,7 @@ public abstract class ServerConnection implements Runnable { setHandshake(readHandshake); setProxyId(readHandshake.getMembershipId()); - if (readHandshake.getVersion().isOlderThan(KnownVersion.GFE_65) - || getCommunicationMode().isWAN()) { + if (getCommunicationMode().isWAN()) { try { setAuthAttributes(); @@ -1127,7 +1126,6 @@ public abstract class ServerConnection implements Runnable { public Part updateAndGetSecurityPart() { // need to take care all message types here if (AcceptorImpl.isAuthenticationRequired() - && handshake.getVersion().isNotOlderThan(KnownVersion.GFE_65) && !communicationMode.isWAN() && !requestMessage.getAndResetIsMetaRegion() && !isInternalMessage(requestMessage, allowInternalMessagesWithoutCredentials)) { setSecurityPart(); @@ -1666,7 +1664,7 @@ public abstract class ServerConnection implements Runnable { public long getUniqueId() { long uniqueId; - if (handshake.getVersion().isOlderThan(KnownVersion.GFE_65) || communicationMode.isWAN()) { + if (communicationMode.isWAN()) { uniqueId = userAuthId; } else if (requestMessage.isSecureMode()) { uniqueId = messageIdExtractor.getUniqueIdFromMessage(requestMessage, diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java index 0e5055e..d691082 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java @@ -97,7 +97,7 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand setOverrides(new byte[] {dataInputStream.readByte()}); // Note: credentials should always be the last piece in handshake for // Diffie-Hellman key exchange to work - if (this.clientVersion.isOlderThan(KnownVersion.GFE_65) || communicationMode.isWAN()) { + if (communicationMode.isWAN()) { credentials = readCredentials(dataInputStream, dataOutputStream, sys, this.securityService); } else { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java index 4b96f75..7d8def2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java @@ -14,8 +14,6 @@ */ package org.apache.geode.internal.cache.tier.sockets.command; -import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,15 +26,12 @@ import org.apache.geode.cache.ResourceException; import org.apache.geode.cache.client.internal.PutOp; import org.apache.geode.cache.operations.PutOperationContext; import org.apache.geode.distributed.internal.DistributionStats; -import org.apache.geode.internal.HeapDataOutputStream; -import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.EventIDHolder; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.TXManagerImpl; -import org.apache.geode.internal.cache.Token; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -47,7 +42,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.security.AuthorizeRequest; import org.apache.geode.internal.security.SecurityService; -import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.util.Breadcrumbs; import org.apache.geode.security.GemFireSecurityException; import org.apache.geode.security.ResourcePermission; @@ -297,20 +291,10 @@ public class Put70 extends BaseCommand { } sendOldValue = true; oldValueIsObject = true; - KnownVersion clientVersion = serverConnection.getClientVersion(); if (oldValue instanceof CachedDeserializable) { oldValue = ((CachedDeserializable) oldValue).getSerializedValue(); } else if (oldValue instanceof byte[]) { oldValueIsObject = false; - } else if ((oldValue instanceof Token) - && clientVersion.isNotNewerThan(KnownVersion.GFE_651)) { - // older clients don't know that Token is now a DSFID class, so we - // put the token in a serialized form they can consume - try (HeapDataOutputStream str = new HeapDataOutputStream(KnownVersion.CURRENT)) { - DataOutput dstr = new DataOutputStream(str); - InternalDataSerializer.writeSerializableObject(oldValue, dstr); - oldValue = str.toByteArray(); - } } result = true; // } catch (Exception e) { @@ -333,20 +317,10 @@ public class Put70 extends BaseCommand { serverConnection.getProxyID(), true, clientEvent); sendOldValue = !clientEvent.isConcurrencyConflict(); oldValueIsObject = true; - KnownVersion clientVersion = serverConnection.getClientVersion(); if (oldValue instanceof CachedDeserializable) { oldValue = ((CachedDeserializable) oldValue).getSerializedValue(); } else if (oldValue instanceof byte[]) { oldValueIsObject = false; - } else if ((oldValue instanceof Token) - && clientVersion.isNotNewerThan(KnownVersion.GFE_651)) { - // older clients don't know that Token is now a DSFID class, so we - // put the token in a serialized form they can consume - try (HeapDataOutputStream str = new HeapDataOutputStream(KnownVersion.CURRENT)) { - DataOutput dstr = new DataOutputStream(str); - InternalDataSerializer.writeSerializableObject(oldValue, dstr); - oldValue = str.toByteArray(); - } } if (isDebugEnabled) { logger.debug("returning {} from replace(K,V)", oldValue); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java index c5cde15..d2b0671 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java @@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.catchThrowable; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; @@ -42,7 +42,6 @@ import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.cache.tier.Encryptor; import org.apache.geode.internal.cache.tier.ServerSideHandshake; import org.apache.geode.internal.security.SecurityService; -import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.security.AuthenticationRequiredException; import org.apache.geode.test.junit.categories.ClientServerTest; @@ -91,11 +90,10 @@ public class ServerConnectionTest { } @Test - public void post65SecureShouldUseUniqueIdFromMessage() { + public void shouldUseUniqueIdFromMessage() { long uniqueIdFromMessage = 23456L; MessageIdExtractor messageIdExtractor = mock(MessageIdExtractor.class); when(handshake.getEncryptor()).thenReturn(mock(Encryptor.class)); - when(handshake.getVersion()).thenReturn(KnownVersion.GFE_81); when(messageIdExtractor.getUniqueIdFromMessage(any(Message.class), any(Encryptor.class), anyLong())).thenReturn(uniqueIdFromMessage); when(requestMessage.isSecureMode()).thenReturn(true); @@ -108,12 +106,8 @@ public class ServerConnectionTest { } @Test - public void post65NonSecureShouldThrow() { - when(handshake.getVersion()).thenReturn(KnownVersion.GFE_81); - - Throwable thrown = catchThrowable(() -> serverConnection.getUniqueId()); - - assertThat(thrown) + public void nonSecureShouldThrow() { + assertThatThrownBy(() -> serverConnection.getUniqueId()) .isExactlyInstanceOf(AuthenticationRequiredException.class) .hasMessage("No security credentials are provided"); } diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java index d88b3b7..5114f00 100644 --- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java +++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java @@ -62,13 +62,6 @@ public class KnownVersion extends AbstractVersion { public static final KnownVersion TOKEN = new KnownVersion("", "TOKEN", (byte) -1, (byte) 0, (byte) 0, (byte) 0, TOKEN_ORDINAL); - private static final short GFE_65_ORDINAL = 6; - - @Immutable - @Deprecated - public static final KnownVersion GFE_65 = - new KnownVersion("GFE", "6.5", (byte) 6, (byte) 5, (byte) 0, (byte) 0, GFE_65_ORDINAL); - private static final short GFE_651_ORDINAL = 7; @Immutable
