This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 01812dfe4b5c40d67ed0c28b42e7d0458e3fb61a Author: Jacob Barrett <[email protected]> AuthorDate: Thu Jan 21 18:55:31 2021 -0800 GEODE-8870: Removes GFE_61. --- .../client/internal/ClientSideHandshakeImpl.java | 11 +- .../cache/tier/sockets/ClientHealthMonitor.java | 2 +- .../tier/sockets/ClientUpdateMessageImpl.java | 166 --------------------- .../cache/tier/sockets/MessageDispatcher.java | 7 +- .../tier/sockets/ServerSideHandshakeImpl.java | 2 +- .../cache/tier/sockets/SocketMessageWriter.java | 67 ++++----- .../cache/tier/sockets/ServerConnectionTest.java | 22 --- .../geode/internal/serialization/KnownVersion.java | 7 - 8 files changed, 39 insertions(+), 245 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java index 8a17041..eb5fcf9 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java @@ -229,9 +229,7 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand readMessage(dis, dos, acceptanceCode, member); // Read delta-propagation property value from server. - // [sumedh] Static variable below? Client can connect to different - // DSes with different values of this. It shoule be a member variable. - if (!communicationMode.isWAN() && currentClientVersion.isNotOlderThan(KnownVersion.GFE_61)) { + if (!communicationMode.isWAN()) { ((InternalDistributedSystem) system).setDeltaEnabledOnServer(dis.readBoolean()); } @@ -319,13 +317,6 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand // Read the message (if any) readMessage(dis, dos, acceptanceCode, member); - // nothing more to be done for older clients used in tests - // there is a difference in serializer map registration for >= 6.5.1.6 - // clients but that is not used in tests - if (currentClientVersion.isOlderThan(KnownVersion.GFE_61)) { - return new ServerQueueStatus(endpointType, queueSize, member); - } - final Map<Integer, List<String>> instantiatorMap = DataSerializer.readHashMap(dis); for (final Map.Entry<Integer, List<String>> entry : instantiatorMap.entrySet()) { final Integer id = entry.getKey(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index 45e6cc4..70a1897 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -674,7 +674,7 @@ public class ClientHealthMonitor { } public boolean hasDeltaClients() { - return getNumberOfClientsAtOrAboveVersion(KnownVersion.GFE_61) > 0; + return getNumberOfClientsAtOrAboveVersion(KnownVersion.OLDEST) > 0; } private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java index 11cc3e8..2e33fde 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java @@ -375,8 +375,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N message = getGFE70Message(proxy, serializedValue, conflation, clientVersion); } else if (clientVersion.isNotOlderThan(KnownVersion.GFE_65)) { message = getGFE65Message(proxy, serializedValue, conflation, clientVersion); - } else if (clientVersion.isNotOlderThan(KnownVersion.GFE_61)) { - message = getGFE61Message(proxy, serializedValue, conflation, clientVersion); } else { throw new IOException( "Unsupported client version for server-to-client message creation: " + clientVersion); @@ -385,170 +383,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N return message; } - private Message getGFE61Message(CacheClientProxy proxy, byte[] latestValue, boolean conflation, - KnownVersion clientVersion) throws IOException { - Message message; - ClientProxyMembershipID proxyId = proxy.getProxyID(); - - // Add CQ info. - int cqMsgParts = 0; - boolean clientHasCq = _hasCqs && (getCqs(proxyId) != null); - - if (clientHasCq) { - cqMsgParts = (getCqs(proxyId).length * 2) + 1; - } - - if (isCreate() || isUpdate()) { - // Create or update event - if (_clientInterestListInv != null && _clientInterestListInv.contains(proxyId)) { - // Notify all - do not send the value - message = new Message(6, clientVersion); - message.setMessageType(MessageType.LOCAL_INVALIDATE); - - // Add the region name - message.addStringPart(_regionName, true); - - // Add the key - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated it later - message.addStringOrObjPart(_keyOfInterest); - - // Add the callback argument - message.addObjPart(_callbackArgument); - - // Add interestlist status. - message.addObjPart(isClientInterested(proxyId)); - - // Add CQ status. - message.addObjPart(Boolean.FALSE); - - } else { - boolean isClientInterested = isClientInterested(proxyId); - // Notify by subscription - send the value - message = new Message(8 + cqMsgParts, clientVersion); - - // Set message type - if (isCreate()) { - message.setMessageType(MessageType.LOCAL_CREATE); - - // Add the region name - message.addStringPart(_regionName, true); - - // Add the key - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated it later - message.addStringOrObjPart(_keyOfInterest); - - message.addObjPart(Boolean.FALSE); // NO delta - // Add the value (which has already been serialized) - message.addRawPart(latestValue, (_valueIsObject == 0x01)); - } else { - message.setMessageType(MessageType.LOCAL_UPDATE); - - // Add the region name - message.addStringPart(_regionName, true); - - // Add the key - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated it later - message.addStringOrObjPart(_keyOfInterest); - - if (deltaBytes != null && !conflation && !proxy.isMarkerEnqueued() - && !proxy.getRegionsWithEmptyDataPolicy().containsKey(_regionName)) { - message.addObjPart(Boolean.TRUE); - message.addBytesPart(deltaBytes); - proxy.getStatistics().incDeltaMessagesSent(); - } else { - message.addObjPart(Boolean.FALSE); - byte[] l = latestValue; - if (l == null) { - if (!(_value instanceof byte[])) { - _value = CacheServerHelper.serialize(_value); - } - l = (byte[]) _value; - } - // Add the value (which has already been serialized) - message.addRawPart(l, (_valueIsObject == 0x01)); - } - } - - // Add the callback argument - message.addObjPart(_callbackArgument); - - // Add interest list status. - message.addObjPart(isClientInterested); - - // Add CQ status. - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } - } else if (isDestroy() || isInvalidate()) { - // Destroy or invalidate event - message = new Message(6 + cqMsgParts, clientVersion); - - if (isDestroy()) { - message.setMessageType(MessageType.LOCAL_DESTROY); - } else { - message.setMessageType(MessageType.LOCAL_INVALIDATE); - } - - message.addStringPart(_regionName, true); - - // Currently serializing the key here instead of when the message - // is put in the queue so that it can be conflated later - message.addStringOrObjPart(_keyOfInterest); - - message.addObjPart(_callbackArgument); - message.addObjPart(isClientInterested(proxyId)); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else if (isDestroyRegion()) { - message = new Message(4 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.LOCAL_DESTROY_REGION); - message.addStringPart(_regionName, true); - message.addObjPart(_callbackArgument); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else if (isClearRegion()) { - message = new Message(4 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.CLEAR_REGION); - message.addStringPart(_regionName, true); - message.addObjPart(_callbackArgument); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else if (isInvalidateRegion()) { - message = new Message(4 + cqMsgParts, clientVersion); - message.setMessageType(MessageType.INVALIDATE_REGION); - message.addStringPart(_regionName, true); - message.addObjPart(_callbackArgument); - message.addObjPart(clientHasCq); - - if (clientHasCq) { - addCqsToMessage(proxyId, message); - } - } else { - throw new InternalGemFireError("Don't know what kind of message"); - } - - message.setTransactionId(0); - // Add the EventId since 5.1 (used to prevent duplicate events - // received on the client side after a failover) - message.addObjPart(_eventIdentifier); - return message; - } - private Message getGFE65Message(CacheClientProxy proxy, byte[] p_latestValue, boolean conflation, KnownVersion clientVersion) throws IOException { byte[] latestValue = p_latestValue; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java index 2fb63d2..c1c9cf3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java @@ -44,7 +44,6 @@ import org.apache.geode.internal.cache.ha.HARegionQueueAttributes; import org.apache.geode.internal.cache.ha.HARegionQueueStats; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.serialization.ByteArrayDataInput; -import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -137,9 +136,9 @@ public class MessageDispatcher extends LoggingThread { ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer()) .putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy()); boolean createDurableQueue = proxy.proxyID.isDurable(); - boolean canHandleDelta = (proxy.getClientVersion().isNotOlderThan(KnownVersion.GFE_61)) - && InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation() - && !(this._proxy.clientConflation == Handshake.CONFLATION_ON); + boolean canHandleDelta = + InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation() + && !(this._proxy.clientConflation == Handshake.CONFLATION_ON); if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) { logger.debug("Creating a {} subscription queue for {}", createDurableQueue ? "durable" : "non-durable", diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java index 89f1e23..0e5055e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java @@ -169,7 +169,7 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand dos.writeUTF(""); // Write delta-propagation property value if this is not WAN. - if (!communicationMode.isWAN() && clientVersion.isNotOlderThan(KnownVersion.GFE_61)) { + if (!communicationMode.isWAN()) { dos.writeBoolean(((InternalDistributedSystem) system).getConfig().getDeltaPropagation()); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java index 4a4a8bb..89bdc8d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/SocketMessageWriter.java @@ -45,45 +45,44 @@ public class SocketMessageWriter { msg = ""; } dos.writeUTF(msg); - if (clientVersion != null && clientVersion.isNotOlderThan(KnownVersion.GFE_61)) { - // get all the instantiators. - Instantiator[] instantiators = InternalInstantiator.getInstantiators(); - HashMap instantiatorMap = new HashMap(); - if (instantiators != null && instantiators.length > 0) { - for (Instantiator instantiator : instantiators) { - ArrayList instantiatorAttributes = new ArrayList(); - instantiatorAttributes.add(instantiator.getClass().toString().substring(6)); - instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6)); - instantiatorMap.put(instantiator.getId(), instantiatorAttributes); - } + + // get all the instantiators. + Instantiator[] instantiators = InternalInstantiator.getInstantiators(); + HashMap instantiatorMap = new HashMap(); + if (instantiators != null && instantiators.length > 0) { + for (Instantiator instantiator : instantiators) { + ArrayList instantiatorAttributes = new ArrayList(); + instantiatorAttributes.add(instantiator.getClass().toString().substring(6)); + instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6)); + instantiatorMap.put(instantiator.getId(), instantiatorAttributes); } - DataSerializer.writeHashMap(instantiatorMap, dos); + } + DataSerializer.writeHashMap(instantiatorMap, dos); - // get all the dataserializers. - DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers(); - HashMap<Integer, ArrayList<String>> dsToSupportedClasses = - new HashMap<Integer, ArrayList<String>>(); - HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>(); - if (dataSerializers != null && dataSerializers.length > 0) { - for (DataSerializer dataSerializer : dataSerializers) { - dataSerializersMap.put(dataSerializer.getId(), - dataSerializer.getClass().toString().substring(6)); - if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) { - ArrayList<String> supportedClassNames = new ArrayList<String>(); - for (Class clazz : dataSerializer.getSupportedClasses()) { - supportedClassNames.add(clazz.getName()); - } - dsToSupportedClasses.put(dataSerializer.getId(), supportedClassNames); + // get all the dataserializers. + DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers(); + HashMap<Integer, ArrayList<String>> dsToSupportedClasses = + new HashMap<Integer, ArrayList<String>>(); + HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>(); + if (dataSerializers != null && dataSerializers.length > 0) { + for (DataSerializer dataSerializer : dataSerializers) { + dataSerializersMap.put(dataSerializer.getId(), + dataSerializer.getClass().toString().substring(6)); + if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) { + ArrayList<String> supportedClassNames = new ArrayList<String>(); + for (Class clazz : dataSerializer.getSupportedClasses()) { + supportedClassNames.add(clazz.getName()); } + dsToSupportedClasses.put(dataSerializer.getId(), supportedClassNames); } } - DataSerializer.writeHashMap(dataSerializersMap, dos); - if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) { - DataSerializer.writeHashMap(dsToSupportedClasses, dos); - } - if (clientVersion.isNotOlderThan(KnownVersion.GEODE_1_5_0)) { - dos.writeInt(CLIENT_PING_TASK_PERIOD); - } + } + DataSerializer.writeHashMap(dataSerializersMap, dos); + if (clientVersion.isNotOlderThan(KnownVersion.GFE_6516)) { + DataSerializer.writeHashMap(dsToSupportedClasses, dos); + } + if (clientVersion.isNotOlderThan(KnownVersion.GEODE_1_5_0)) { + dos.writeInt(CLIENT_PING_TASK_PERIOD); } dos.flush(); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java index 792dbf0..c5cde15 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java @@ -91,28 +91,6 @@ public class ServerConnectionTest { } @Test - public void pre65SecureShouldReturnUserAuthId() { - long userAuthId = 12345L; - when(handshake.getVersion()).thenReturn(KnownVersion.GFE_61); - serverConnection.setUserAuthId(userAuthId); - - long value = serverConnection.getUniqueId(); - - assertThat(value).isEqualTo(userAuthId); - } - - @Test - public void pre65NonSecureShouldReturnUserAuthId() { - when(handshake.getVersion()).thenReturn(KnownVersion.GFE_61); - long userAuthId = 12345L; - serverConnection.setUserAuthId(userAuthId); - - long value = serverConnection.getUniqueId(); - - assertThat(value).isEqualTo(userAuthId); - } - - @Test public void post65SecureShouldUseUniqueIdFromMessage() { long uniqueIdFromMessage = 23456L; MessageIdExtractor messageIdExtractor = mock(MessageIdExtractor.class); diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java index fe41527..d88b3b7 100644 --- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java +++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/KnownVersion.java @@ -62,13 +62,6 @@ public class KnownVersion extends AbstractVersion { public static final KnownVersion TOKEN = new KnownVersion("", "TOKEN", (byte) -1, (byte) 0, (byte) 0, (byte) 0, TOKEN_ORDINAL); - private static final short GFE_61_ORDINAL = 5; - - @Immutable - @Deprecated - public static final KnownVersion GFE_61 = - new KnownVersion("GFE", "6.1", (byte) 6, (byte) 1, (byte) 0, (byte) 0, GFE_61_ORDINAL); - private static final short GFE_65_ORDINAL = 6; @Immutable
