This is an automated email from the ASF dual-hosted git repository. jasonhuynh pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4a9e651 GEODE-2667: Add API to destroy GatewayReceiver (#1410) 4a9e651 is described below commit 4a9e6518f2a3811941a48a95d398557bd2bf2f97 Author: Jason Huynh <huyn...@gmail.com> AuthorDate: Fri Feb 9 15:41:48 2018 -0800 GEODE-2667: Add API to destroy GatewayReceiver (#1410) * Added API to destroy a cache server * Added new RemoveCacheServerProfileMessage * The hasCacheServer flag in the profile is set to false if size of cache servers is 0 * Destroying GatewayReceiver will remove the jmx bean and proxy bean --- .../apache/geode/cache/wan/GatewayReceiver.java | 5 + .../geode/distributed/internal/ResourceEvent.java | 1 + .../org/apache/geode/internal/DSFIDFactory.java | 2 + .../geode/internal/DataSerializableFixedID.java | 4 +- .../geode/internal/cache/GemFireCacheImpl.java | 44 ++++++ .../apache/geode/internal/cache/InternalCache.java | 4 + .../cache/RemoveCacheServerProfileMessage.java | 162 +++++++++++++++++++++ .../internal/cache/xmlcache/CacheCreation.java | 15 ++ .../cache/xmlcache/GatewayReceiverCreation.java | 4 + .../geode/management/JMXNotificationType.java | 9 ++ .../geode/management/internal/MBeanJMXAdapter.java | 1 - .../management/internal/ManagementConstants.java | 1 + .../internal/beans/GatewayReceiverMBeanBridge.java | 4 + .../internal/beans/ManagementAdapter.java | 25 ++++ .../internal/beans/ManagementListener.java | 4 + .../geode/internal/cache/GemFireCacheImplTest.java | 23 +++ .../codeAnalysis/sanctionedDataSerializables.txt | 5 + .../internal/cache/wan/GatewayReceiverImpl.java | 11 ++ .../cache/wan/GatewayReceiverImplJUnitTest.java | 56 +++++++ .../cache/wan/GatewayReceiverMBeanDUnitTest.java | 118 +++++++++++++++ .../cache/wan/serial/GatewayReceiverDUnitTest.java | 144 ++++++++++++++++++ 21 files changed, 640 insertions(+), 2 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java index 43d398e..987029e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java +++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java @@ -91,6 +91,11 @@ public interface GatewayReceiver { void stop(); /** + * Destroy this receiver. Stop should be called before calling destroy + */ + void destroy(); + + /** * Returns whether or not this receiver is running */ boolean isRunning(); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java index b3f7add..07bb949 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java @@ -38,6 +38,7 @@ public enum ResourceEvent { SYSTEM_ALERT, CACHE_SERVER_START, CACHE_SERVER_STOP, + GATEWAYRECEIVER_DESTROY, GATEWAYRECEIVER_START, GATEWAYRECEIVER_STOP, GATEWAYRECEIVER_CREATE, diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index 0237b53..e3379e5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -241,6 +241,7 @@ import org.apache.geode.internal.cache.PartitionRegionConfig; import org.apache.geode.internal.cache.PreferBytesCachedDeserializable; import org.apache.geode.internal.cache.RegionEventImpl; import org.apache.geode.internal.cache.ReleaseClearLockMessage; +import org.apache.geode.internal.cache.RemoveCacheServerProfileMessage; import org.apache.geode.internal.cache.RoleEventImpl; import org.apache.geode.internal.cache.SearchLoadAndWriteProcessor; import org.apache.geode.internal.cache.ServerPingMessage; @@ -576,6 +577,7 @@ public class DSFIDFactory implements DataSerializableFixedID { registerDSFID(REMOTE_PUTALL_MESSAGE, RemotePutAllMessage.class); registerDSFID(VERSION_TAG, VMVersionTag.class); registerDSFID(ADD_CACHESERVER_PROFILE_UPDATE, AddCacheServerProfileMessage.class); + registerDSFID(REMOVE_CACHESERVER_PROFILE_UPDATE, RemoveCacheServerProfileMessage.class); registerDSFID(SERVER_INTEREST_REGISTRATION_MESSAGE, ServerInterestRegistrationMessage.class); registerDSFID(FILTER_PROFILE_UPDATE, FilterProfile.OperationMessage.class); registerDSFID(PR_GET_MESSAGE, GetMessage.class); diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java index f55e786..fff0f1d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java @@ -239,7 +239,9 @@ public interface DataSerializableFixedID extends SerializationVersions { byte LATEST_LAST_ACCESS_TIME_MESSAGE = -20; - // IDs -19 .. -16 are not used + public static final byte REMOVE_CACHESERVER_PROFILE_UPDATE = -19; + + // IDs -18 .. -16 are not used /** * A header byte meaning that the next element in the stream is a <code>VMIdProfile</code>. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index f87b4e9..f40e900 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -3812,6 +3812,12 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has return cacheServer; } + public boolean removeCacheServer(CacheServer cacheServer) { + boolean removed = this.allCacheServers.remove(cacheServer); + sendRemoveCacheServerProfileMessage(); + return removed; + } + @Override public void addGatewaySender(GatewaySender sender) { if (isClient()) { @@ -3901,6 +3907,21 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } } + public void removeGatewayReceiver(GatewayReceiver receiver) { + if (isClient()) { + throw new UnsupportedOperationException("operation is not supported on a client cache"); + } + this.stopper.checkCancelInProgress(null); + synchronized (this.allGatewayReceiversLock) { + Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1); + if (!this.allGatewayReceivers.isEmpty()) { + newReceivers.addAll(this.allGatewayReceivers); + } + newReceivers.remove(receiver); + this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers); + } + } + @Override public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) { this.allAsyncEventQueues.add(asyncQueue); @@ -4568,6 +4589,29 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } } + + private void sendRemoveCacheServerProfileMessage() { + Set otherMembers = this.dm.getOtherDistributionManagerIds(); + RemoveCacheServerProfileMessage message = new RemoveCacheServerProfileMessage(); + message.operateOnLocalCache(this); + if (!otherMembers.isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug("Sending add cache server profile message to other members."); + } + ReplyProcessor21 replyProcessor = new ReplyProcessor21(this.dm, otherMembers); + message.setRecipients(otherMembers); + message.processorId = replyProcessor.getProcessorId(); + this.dm.putOutgoing(message); + + // Wait for replies. + try { + replyProcessor.waitForReplies(); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + } + @Override public TXManagerImpl getTxManager() { return this.transactionManager; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java index d609c44..0c81809 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java @@ -307,8 +307,12 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { void addGatewayReceiver(GatewayReceiver receiver); + void removeGatewayReceiver(GatewayReceiver receiver); + CacheServer addCacheServer(boolean isGatewayReceiver); + boolean removeCacheServer(CacheServer cacheServer); + /** * A test-hook allowing you to alter the cache setting established by * CacheFactory.setPdxReadSerialized() diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RemoveCacheServerProfileMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoveCacheServerProfileMessage.java new file mode 100644 index 0000000..bd3b904 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoveCacheServerProfileMessage.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.MessageWithReply; +import org.apache.geode.distributed.internal.ReplyMessage; +import org.apache.geode.distributed.internal.SerialDistributionMessage; +import org.apache.geode.internal.logging.LogService; + +/** + * OperationMessage synchronously propagates a change in the profile to another member. It is a + * serial message so that there is no chance of out-of-order execution. + */ +public class RemoveCacheServerProfileMessage extends SerialDistributionMessage + implements MessageWithReply { + + private static final Logger logger = LogService.getLogger(); + + int processorId; + + @Override + protected void process(ClusterDistributionManager dm) { + int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); + try { + InternalCache cache = dm.getCache(); + // will be null if not initialized + if (cache != null && !cache.isClosed()) { + operateOnCache(cache); + } + } finally { + LocalRegion.setThreadInitLevelRequirement(oldLevel); + ReplyMessage reply = new ReplyMessage(); + reply.setProcessorId(this.processorId); + reply.setRecipient(getSender()); + try { + dm.putOutgoing(reply); + } catch (CancelException ignore) { + // can't send a reply, so ignore the exception + } + } + } + + private void operateOnCache(InternalCache cache) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + if (cache.getCacheServers().size() == 0) { + + for (DistributedRegion r : getDistributedRegions(cache)) { + CacheDistributionAdvisor cda = (CacheDistributionAdvisor) r.getDistributionAdvisor(); + CacheDistributionAdvisor.CacheProfile cp = + (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender()); + if (cp != null) { + if (isDebugEnabled) { + logger.debug("Setting hasCacheServer flag to false on region \"{}\" for {}", + r.getFullPath(), getSender()); + } + cp.hasCacheServer = false; + } + } + for (PartitionedRegion r : this.getPartitionedRegions(cache)) { + CacheDistributionAdvisor cda = (CacheDistributionAdvisor) r.getDistributionAdvisor(); + CacheDistributionAdvisor.CacheProfile cp = + (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender()); + if (cp != null) { + if (isDebugEnabled) { + logger.debug("Setting hasCacheServer flag to false on region \"{}\" for {}", + r.getFullPath(), getSender()); + } + cp.hasCacheServer = false; + } + } + } + } + + /** set the hasCacheServer flags for all regions in this cache */ + public void operateOnLocalCache(InternalCache cache) { + int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); + if (cache.getCacheServers().size() == 0) { + try { + for (InternalRegion r : getAllRegions(cache)) { + FilterProfile fp = r.getFilterProfile(); + if (fp != null) { + fp.getLocalProfile().hasCacheServer = false; + } + } + for (PartitionedRegion r : getPartitionedRegions(cache)) { + FilterProfile fp = r.getFilterProfile(); + if (fp != null) { + fp.getLocalProfile().hasCacheServer = false; + } + } + } finally { + LocalRegion.setThreadInitLevelRequirement(oldLevel); + } + } + } + + + private Set<InternalRegion> getAllRegions(InternalCache internalCache) { + return internalCache.getAllRegions(); + } + + private Set<DistributedRegion> getDistributedRegions(InternalCache internalCache) { + Set<DistributedRegion> result = new HashSet<>(); + for (InternalRegion r : internalCache.getAllRegions()) { + if (r instanceof DistributedRegion) { + result.add((DistributedRegion) r); + } + } + return result; + } + + private Set<PartitionedRegion> getPartitionedRegions(InternalCache internalCache) { + return (Set<PartitionedRegion>) new HashSet(internalCache.getPartitionedRegions()); + } + + /** for deserialization only */ + public RemoveCacheServerProfileMessage() {} + + @Override + public int getDSFID() { + return REMOVE_CACHESERVER_PROFILE_UPDATE; + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + out.writeInt(this.processorId); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.processorId = in.readInt(); + } + + @Override + public String toString() { + return this.getShortClassName() + "(processorId=" + this.processorId + ")"; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java index a192e04..7727add 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java @@ -643,6 +643,12 @@ public class CacheCreation implements InternalCache { } } + void removeCacheServers(List<CacheServer> declarativeCacheServers, Cache cache, + Integer serverPort, String serverBindAdd, Boolean disableDefaultServer) { + + throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); + } + /** * Returns a description of the disk store used by the pdx registry. */ @@ -1030,6 +1036,11 @@ public class CacheCreation implements InternalCache { } @Override + public boolean removeCacheServer(final CacheServer cacheServer) { + throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); + } + + @Override public void setReadSerializedForCurrentThread(final boolean value) { throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); } @@ -1219,6 +1230,10 @@ public class CacheCreation implements InternalCache { this.gatewayReceivers.add(receiver); } + public void removeGatewayReceiver(GatewayReceiver receiver) { + throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString()); + } + public void addAsyncEventQueue(AsyncEventQueue asyncEventQueue) { this.asyncEventQueues.add(asyncEventQueue); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java index 441290b..d60dcb8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java @@ -137,6 +137,10 @@ public class GatewayReceiverCreation implements GatewayReceiver { } + public void destroy() { + + } + public boolean isRunning() { return false; } diff --git a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java index 351a3a6..5651903 100644 --- a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java +++ b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java @@ -218,6 +218,15 @@ public interface JMXNotificationType { DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.gateway.receiver.stopped"; /** + * Notification type which indicates that a gateway receiver is destroyed <BR> + * The value of this type string is + * <CODE>gemfire.distributedsystem.gateway.receiver.destroyed</CODE>. + */ + public static final String GATEWAY_RECEIVER_DESTROYED = + DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.gateway.receiver.destroyed"; + + + /** * Notification type which indicates that locator is started <BR> * The value of this type string is <CODE>gemfire.distributedsystem.locator.started</CODE>. */ diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java index f3c0fe3..dde1387 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java @@ -176,7 +176,6 @@ public class MBeanJMXAdapter implements ManagementConstants { public void unregisterMBean(ObjectName objectName) { try { - if (!isRegistered(objectName)) { return; } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java index 8f8ad7c..004ba21 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java @@ -183,6 +183,7 @@ public interface ManagementConstants { String GATEWAY_SENDER_REMOVED_PREFIX = "GatewaySender Removed in the VM "; String GATEWAY_RECEIVER_CREATED_PREFIX = "GatewayReceiver Created in the VM "; + String GATEWAY_RECEIVER_DESTROYED_PREFIX = "GatewayReceiver Destroyed in the VM "; String GATEWAY_RECEIVER_STARTED_PREFIX = "GatewayReceiver Started in the VM "; String GATEWAY_RECEIVER_STOPPED_PREFIX = "GatewayReceiver Stopped in the VM "; diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java index 3dc9f80..a2ffe01 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java @@ -52,6 +52,10 @@ public class GatewayReceiverMBeanBridge extends ServerBridge { initializeReceiverStats(); } + protected void destroyServer() { + removeServer(); + } + protected void startServer() { CacheServer server = rcv.getServer(); addServer(server); diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java index 171256e..8722f6f 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java @@ -479,6 +479,31 @@ public class ManagementAdapter { } /** + * Handles Gateway receiver destroy + * + * @param recv specific gateway receiver + * @throws ManagementException + */ + public void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException { + if (!isServiceInitialised("handleGatewayReceiverDestroy")) { + return; + } + + GatewayReceiverMBean mbean = (GatewayReceiverMBean) service.getLocalGatewayReceiverMXBean(); + GatewayReceiverMBeanBridge bridge = mbean.getBridge(); + + bridge.destroyServer(); + ObjectName objectName = (MBeanJMXAdapter + .getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember())); + + service.unregisterMBean(objectName); + Notification notification = new Notification(JMXNotificationType.GATEWAY_RECEIVER_DESTROYED, + memberSource, SequenceNumber.next(), System.currentTimeMillis(), + ManagementConstants.GATEWAY_RECEIVER_DESTROYED_PREFIX); + memberLevelNotifEmitter.sendNotification(notification); + } + + /** * Handles Gateway receiver creation * * @param recv specific gateway receiver diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java index aca3410..5c2bcd7 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java @@ -129,6 +129,10 @@ public class ManagementListener implements ResourceEventsListener { GatewayReceiver createdRecv = (GatewayReceiver) resource; adapter.handleGatewayReceiverCreate(createdRecv); break; + case GATEWAYRECEIVER_DESTROY: + GatewayReceiver destroyedRecv = (GatewayReceiver) resource; + adapter.handleGatewayReceiverDestroy(destroyedRecv); + break; case GATEWAYRECEIVER_START: GatewayReceiver startedRecv = (GatewayReceiver) resource; adapter.handleGatewayReceiverStart(startedRecv); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java index cd33138..8161926 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java @@ -30,6 +30,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.wan.GatewayReceiver; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.control.InternalResourceManager; @@ -216,6 +218,27 @@ public class GemFireCacheImplTest { } @Test + public void removeGatewayReceiverShouldRemoveFromReceiversList() { + GatewayReceiver receiver = mock(GatewayReceiver.class); + cache = GemFireCacheImpl.create(distributedSystem, cacheConfig); + cache.addGatewayReceiver(receiver); + assertEquals(1, cache.getGatewayReceivers().size()); + cache.removeGatewayReceiver(receiver); + assertEquals(0, cache.getGatewayReceivers().size()); + } + + + @Test + public void removeFromCacheServerShouldRemoveFromCacheServersList() { + cache = GemFireCacheImpl.create(distributedSystem, cacheConfig); + CacheServer cacheServer = cache.addCacheServer(false); + assertEquals(1, cache.getCacheServers().size()); + cache.removeCacheServer(cacheServer); + assertEquals(0, cache.getCacheServers().size()); + } + + + @Test public void testIsMisConfigured() { Properties clusterProps = new Properties(); Properties serverProps = new Properties(); diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index dadb049..3623071 100644 --- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -2136,3 +2136,8 @@ org/apache/geode/redis/internal/DoubleWrapper,2 fromData,9,2a2bb80004b50002b1 toData,9,2ab400022bb80003b1 +org/apache/geode/internal/cache/RemoveCacheServerProfileMessage,2 +fromData,16,2a2bb700322a2bb900330100b50008b1 +toData,16,2a2bb700302b2ab40008b900310200b1 + + diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java index 190f6d1..caa4453 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java @@ -215,6 +215,17 @@ public class GatewayReceiverImpl implements GatewayReceiver { receiver.stop(); } + public void destroy() { + if (receiver.isRunning()) { + throw new GatewayReceiverException( + "Gateway Receiver is running and needs to be stopped first"); + } + this.cache.removeGatewayReceiver(this); + this.cache.removeCacheServer(receiver); + InternalDistributedSystem system = this.cache.getInternalDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_DESTROY, this); + } + public String getBindAddress() { return this.bindAdd; } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java index 472ba27..32edb9b 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java @@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.wan; import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -56,4 +58,58 @@ public class GatewayReceiverImplJUnitTest { assertEquals("hello", gateway.getHost()); } + @Test + public void destroyCalledOnRunningGatewayReceiverShouldThrowException() throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + InternalDistributedSystem system = mock(InternalDistributedSystem.class); + when(cache.getInternalDistributedSystem()).thenReturn(system); + when(server.getExternalAddress()).thenReturn("hello"); + when(server.isRunning()).thenReturn(true); + when(cache.addCacheServer(eq(true))).thenReturn(server); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true); + gateway.start(); + try { + gateway.destroy(); + fail(); + } catch (GatewayReceiverException e) { + assertEquals("Gateway Receiver is running and needs to be stopped first", e.getMessage()); + } + } + + @Test + public void destroyCalledOnStoppedGatewayReceiverShouldRemoveRecieverFromCacheServers() + throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + InternalDistributedSystem system = mock(InternalDistributedSystem.class); + when(cache.getInternalDistributedSystem()).thenReturn(system); + when(server.getExternalAddress()).thenReturn("hello"); + when(cache.addCacheServer(eq(true))).thenReturn(server); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true); + gateway.start(); + // sender is mocked already to say running is false + gateway.destroy(); + verify(cache, times(1)).removeCacheServer(server); + } + + @Test + public void destroyCalledOnStoppedGatewayReceiverShouldRemoveRecieverFromReceivers() + throws IOException { + InternalCache cache = mock(InternalCache.class); + CacheServerImpl server = mock(CacheServerImpl.class); + InternalDistributedSystem system = mock(InternalDistributedSystem.class); + when(cache.getInternalDistributedSystem()).thenReturn(system); + when(server.getExternalAddress()).thenReturn("hello"); + when(cache.addCacheServer(eq(true))).thenReturn(server); + GatewayReceiverImpl gateway = + new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true); + gateway.start(); + // sender is mocked already to say running is false + gateway.destroy(); + verify(cache, times(1)).removeGatewayReceiver(gateway); + } + } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java new file mode 100644 index 0000000..4eb567a --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.management.ObjectName; + +import org.awaitility.Awaitility; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.wan.GatewayReceiver; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.management.GatewayReceiverMXBean; +import org.apache.geode.management.ManagementTestBase; +import org.apache.geode.management.internal.MBeanJMXAdapter; +import org.apache.geode.management.internal.SystemManagementService; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class GatewayReceiverMBeanDUnitTest extends ManagementTestBase { + + @Test + public void testMBeanAndProxiesForGatewayReceiverAreCreated() throws Exception { + initManagement(true); + + + // Verify MBean is created in each managed node + for (VM vm : getManagedNodeList()) { + vm.invoke(() -> { + GatewayReceiver receiver = getCache().createGatewayReceiverFactory().create(); + }); + vm.invoke(() -> verifyMBean()); + } + + // Verify MBean proxies are created in the managing node + getManagingNode().invoke(() -> verifyMBeanProxies(getCache())); + } + + @Test + public void testMBeanAndProxiesForGatewayReceiverAreRemovedOnDestroy() throws Exception { + initManagement(true); + + // Verify MBean is created in each managed node + for (VM vm : getManagedNodeList()) { + vm.invoke(() -> { + GatewayReceiver receiver = getCache().createGatewayReceiverFactory().create(); + receiver.start(); + receiver.stop(); + receiver.destroy(); + + }); + vm.invoke(() -> verifyMBeanDoesNotExist()); + } + + // Verify MBean proxies are created in the managing node + getManagingNode().invoke(() -> verifyMBeanProxiesDoesNotExist(getCache())); + } + + private void verifyMBean() { + assertNotNull(getMBean()); + } + + private void verifyMBeanDoesNotExist() { + assertNull(getMBean()); + } + + private GatewayReceiverMXBean getMBean() { + ObjectName objectName = + MBeanJMXAdapter.getGatewayReceiverMBeanName(getSystem().getDistributedMember()); + return getManagementService().getMBeanInstance(objectName, GatewayReceiverMXBean.class); + } + + private static void verifyMBeanProxies(final InternalCache cache) { + Set<DistributedMember> members = + cache.getDistributionManager().getOtherNormalDistributionManagerIds(); + for (DistributedMember member : members) { + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .until(() -> assertNotNull(getMBeanProxy(member))); + } + } + + private static void verifyMBeanProxiesDoesNotExist(final InternalCache cache) { + Set<DistributedMember> members = + cache.getDistributionManager().getOtherNormalDistributionManagerIds(); + for (DistributedMember member : members) { + assertNull(getMBeanProxy(member)); + } + } + + private static GatewayReceiverMXBean getMBeanProxy(DistributedMember member) { + SystemManagementService service = (SystemManagementService) getManagementService(); + ObjectName objectName = MBeanJMXAdapter.getGatewayReceiverMBeanName(member); + return service.getMBeanProxy(objectName, GatewayReceiverMXBean.class); + } + + +} diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/GatewayReceiverDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/GatewayReceiverDUnitTest.java new file mode 100644 index 0000000..006a2df --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/GatewayReceiverDUnitTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.serial; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.wan.GatewayReceiver; +import org.apache.geode.cache.wan.GatewayReceiverFactory; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.CacheDistributionAdvisor; +import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.SerializableCallable; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class GatewayReceiverDUnitTest extends WANTestBase { + + private static GatewayReceiver receiver; + + @Test + public void removingGatewayReceiverUsingReplicatedRegionShouldRemoveCacheServerFlagFromProfile() + throws Exception { + testPrimarySecondaryQueueDrainInOrder_RR( + () -> WANTestBase.createReplicatedRegion(getTestMethodName(), null, isOffHeap()), + () -> ((DistributedRegion) WANTestBase.cache.getRegion(getTestMethodName())) + .getDistributionAdvisor()); + } + + @Test + public void removingGatewayReceiverUsingPartitionedRegionShouldRemoveCacheServerFlagFromProfile() + throws Exception { + testPrimarySecondaryQueueDrainInOrder_RR( + () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 10, isOffHeap()), + () -> ((PartitionedRegion) WANTestBase.cache.getRegion(getTestMethodName())) + .getDistributionAdvisor()); + } + + public <T> void testPrimarySecondaryQueueDrainInOrder_RR( + SerializableRunnableIF createRegionLambda, + SerializableCallableIF<DistributionAdvisor> extractAdvisorLambda) throws Exception { + InternalDistributedMember[] memberIds = new InternalDistributedMember[8]; + + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + vm2.invoke(() -> WANTestBase.createCache(nyPort)); + vm3.invoke(() -> WANTestBase.createCache(nyPort)); + + memberIds[2] = (InternalDistributedMember) vm2 + .invoke(() -> WANTestBase.cache.getDistributedSystem().getDistributedMember()); + + memberIds[3] = (InternalDistributedMember) vm3 + .invoke(() -> WANTestBase.cache.getDistributedSystem().getDistributedMember()); + + vm2.invoke(createRegionLambda); + vm3.invoke(createRegionLambda); + + vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 100)); + + vm2.invoke(() -> { + GatewayReceiverDUnitTest.receiver = GatewayReceiverDUnitTest.createAndReturnReceiver(); + return; + }); + vm3.invoke(() -> { + GatewayReceiverDUnitTest.receiver = GatewayReceiverDUnitTest.createAndReturnReceiver(); + return; + }); + + vm2.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[3], true, extractAdvisorLambda)); + vm3.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[2], true, extractAdvisorLambda)); + + vm2.invoke(() -> { + GatewayReceiverDUnitTest.receiver.stop(); + GatewayReceiverDUnitTest.receiver.destroy(); + }); + + vm2.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[3], true, extractAdvisorLambda)); + // vm3 should still see that vm2's profile still has cache server set to true + vm3.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[2], false, extractAdvisorLambda)); + + vm3.invoke(() -> { + GatewayReceiverDUnitTest.receiver.stop(); + GatewayReceiverDUnitTest.receiver.destroy(); + }); + + vm2.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[3], false, extractAdvisorLambda)); + vm3.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[2], false, extractAdvisorLambda)); + } + + + private void assertProfileCacheServerFlagEquals(InternalDistributedMember member, + boolean expectedFlag, SerializableCallableIF<DistributionAdvisor> extractAdvisor) + throws Exception { + DistributionAdvisor advisor = extractAdvisor.call(); + CacheDistributionAdvisor.CacheProfile cp = + (CacheDistributionAdvisor.CacheProfile) advisor.getProfile(member); + assertEquals(expectedFlag, cp.hasCacheServer); + } + + public static GatewayReceiver createAndReturnReceiver() { + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail( + "Test " + getTestMethodName() + " failed to start GatewayReceiver on port " + port, e); + } + return receiver; + } + +} -- To stop receiving notification emails like this one, please contact jasonhu...@apache.org.