Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-77 c152e20b0 -> f3034be68
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index 204cc0b..d7283cc 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -1255,7 +1255,7 @@ public class GMSMembershipManager implements MembershipManager, Manager return; } } - processMessage(msg); + dispatchMessage(msg); } public void warnShun(DistributedMember m) { @@ -1276,6 +1276,11 @@ public class GMSMembershipManager implements MembershipManager, Manager logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_DISREGARDING_SHUNNED_MEMBER_0, m)); } + @Override + public void processMessage(DistributionMessage msg) { + handleOrDeferMessage(msg); + } + /** * Logic for processing a distribution message. * <p> @@ -1283,7 +1288,7 @@ public class GMSMembershipManager implements MembershipManager, Manager * We handle this here, and generate an uplevel event if necessary * @param msg the message */ - public void processMessage(DistributionMessage msg) { + public void dispatchMessage(DistributionMessage msg) { boolean isNew = false; InternalDistributedMember m = msg.getSender(); boolean shunned = false; @@ -2090,6 +2095,16 @@ public class GMSMembershipManager implements MembershipManager, Manager services.getCancelCriterion().generateCancelledException(null)); } + if (playingDead) { // wellness test hook + while (playingDead && !shutdownInProgress) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + if (isJoining()) { // If we get here, we are starting up, so just report a failure. if (allDestinations) @@ -2892,6 +2907,8 @@ public class GMSMembershipManager implements MembershipManager, Manager return; // probably a race condition } + setShutdown(); + final Exception shutdownCause = new ForcedDisconnectException(reason); // cache the exception so it can be appended to ShutdownExceptions http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java index 4625a35..71b4589 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java @@ -99,8 +99,8 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.Install import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckResponseMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.NetworkPartitionMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage; @@ -476,8 +476,8 @@ public final class DSFIDFactory implements DataSerializableFixedID { private static void registerDSFIDTypes() { registerDSFID(NETWORK_PARTITION_MESSAGE, NetworkPartitionMessage.class); registerDSFID(REMOVE_MEMBER_REQUEST, RemoveMemberMessage.class); - registerDSFID(CHECK_REQUEST, CheckRequestMessage.class); - registerDSFID(CHECK_RESPONSE, CheckResponseMessage.class); + registerDSFID(HEARTBEAT_REQUEST, HeartbeatRequestMessage.class); + registerDSFID(HEARTBEAT_RESPONSE, HeartbeatMessage.class); registerDSFID(SUSPECT_MEMBERS_MESSAGE, SuspectMembersMessage.class); registerDSFID(LEAVE_REQUEST_MESSAGE, LeaveRequestMessage.class); registerDSFID(VIEW_ACK_MESSAGE, ViewAckMessage.class); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java index 1da7038..34d051c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java @@ -77,8 +77,8 @@ public interface DataSerializableFixedID extends SerializationVersions { public static final short NETWORK_PARTITION_MESSAGE = -157; public static final short SUSPECT_MEMBERS_MESSAGE = -156; - public static final short CHECK_RESPONSE = -155; - public static final short CHECK_REQUEST = -154; + public static final short HEARTBEAT_RESPONSE = -155; + public static final short HEARTBEAT_REQUEST = -154; public static final short REMOVE_MEMBER_REQUEST = -153; public static final short LEAVE_REQUEST_MESSAGE = -152; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/Version.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/Version.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/Version.java index e58dce8..7a2dca1 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/Version.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/Version.java @@ -54,7 +54,7 @@ public final class Version implements Comparable<Version> { /** byte used as ordinal to represent this <code>Version</code> */ private final short ordinal; - public static final int HIGHEST_VERSION = 43; + public static final int HIGHEST_VERSION = 45; private static final Version[] VALUES = new Version[HIGHEST_VERSION+1]; @@ -177,9 +177,9 @@ public final class Version implements Comparable<Version> { public static final Version GFE_82 = new Version("GFE", "8.2", (byte)8, (byte)2, (byte)0, (byte)0, GFE_82_ORDINAL); - // 41-42 available for 8.2.x variants + // 41-44 available for 8.2.x variants - private static final byte GFE_90_ORDINAL = 43; + private static final byte GFE_90_ORDINAL = 45; public static final Version GFE_90 = new Version("GFE", "9.0", (byte)9, (byte)0, (byte)0, (byte)0, GFE_90_ORDINAL); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CreateRegionProcessor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CreateRegionProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CreateRegionProcessor.java index e90f27b..4069c15 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CreateRegionProcessor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CreateRegionProcessor.java @@ -93,7 +93,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor { CreateRegionReplyProcessor replyProc = new CreateRegionReplyProcessor(recps); - boolean useMcast = false; // never use multicast for region meta-level ops (can cause hangs) + boolean useMcast = false; // multicast is disabled for this message for now CreateRegionMessage msg = getCreateRegionMessage(recps, replyProc, useMcast); // since PR buckets can be created during cache entry operations, enable http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java old mode 100644 new mode 100755 index 0b00e31..3e5d177 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java @@ -1434,6 +1434,10 @@ public class HandShake implements ClientHandShake byte[] memberBytes = DataSerializer.readByteArray(p_dis); ByteArrayInputStream bais = new ByteArrayInputStream(memberBytes); DataInputStream dis = new DataInputStream(bais); + Version v = InternalDataSerializer.getVersionForDataStreamOrNull(p_dis); + if (v != null) { + dis = new VersionedDataInputStream(dis, v); + } try { return (DistributedMember)DataSerializer.readObject(dis); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index 9f079fa..20e9f0f 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -1884,18 +1884,14 @@ public class Connection implements Runnable { } catch (ClosedChannelException e) { this.readerShuttingDown = true; - if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { - initiateSuspicionIfShared(); - } + initiateSuspicionIfShared(); try { requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e)); } catch (Exception ex) {} return; } catch (IOException e) { - if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { - initiateSuspicionIfShared(); - } + initiateSuspicionIfShared(); if (! isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08 ) { @@ -1919,9 +1915,7 @@ public class Connection implements Runnable { if (!stopped && ! isSocketClosed() ) { logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e); } - if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { - initiateSuspicionIfShared(); - } + initiateSuspicionIfShared(); this.readerShuttingDown = true; try { requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e)); @@ -2442,9 +2436,7 @@ public class Connection implements Runnable { this.stopped = true; } catch (IOException io) { - if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) { - initiateSuspicionIfShared(); - } + initiateSuspicionIfShared(); boolean closed = isSocketClosed() || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed for Solaris jdk 1.4.2_08 if (!closed) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java index d599bc9..6d73c19 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java @@ -189,26 +189,35 @@ public class PluckStacks { if (threadName.startsWith("Function Execution Processor")) { return isIdleExecutor(thread); } - if (threadName.startsWith("GemFire Membership Timer")) { + if (threadName.startsWith("Geode Failure Detection Server thread")) { + return stackSize < 11 && thread.get(1).contains("Thread.State: WAITING"); + } + if (threadName.startsWith("Geode Membership Timer")) { // System.out.println("gf timer stack size = " + stackSize + "; frame = " + thread.get(1)); return stackSize < 9 && (thread.get(1).contains("Thread.State: WAITING") || thread.get(1).contains("Thread.State: TIMED_WAITING")); } - if (threadName.startsWith("GemFire Membership View Creator")) { + if (threadName.startsWith("Geode Membership View Creator")) { // System.out.println("gf view creator stack size = " + stackSize + "; frame = " + thread.get(1)); return stackSize < 8 && thread.get(1).contains("Thread.State: WAITING"); - } - if (threadName.startsWith("GemFire Suspect Message Collector")) { -// System.out.println("gf suspect collector stack size = " + stackSize + "; frame = " + thread.get(1)); - return stackSize <= 7 && thread.get(1).contains("Thread.State: WAITING"); } + if (threadName.startsWith("Geode Heartbeat Sender")) { + return stackSize <= 8 && thread.get(1).contains("Thread.State: WAITING"); + } + // thread currently disabled +// if (threadName.startsWith("Geode Suspect Message Collector")) { +// return stackSize <= 7 && thread.get(1).contains("Thread.State: WAITING"); +// } + if (threadName.startsWith("multicast receiver")) { + return (stackSize > 2 && thread.get(2).contains("PlainDatagramSocketImpl.receive")); + } if (threadName.startsWith("P2P Listener")) { // System.out.println("p2p listener stack size = " + stackSize + "; frame = " + thread.get(2)); return (stackSize == 8 && thread.get(2).contains("SocketChannelImpl.accept")); } if (threadName.startsWith("P2P message reader")) { - return (stackSize <= 12 && + return (stackSize <= 14 && (thread.getFirstFrame().contains("FileDispatcherImpl.read") || thread.getFirstFrame().contains("FileDispatcher.read") || thread.getFirstFrame().contains("SocketDispatcher.read"))); @@ -280,15 +289,6 @@ public class PluckStacks { if (threadName.startsWith("Event Processor for GatewaySender")) { return !thread.isRunnable() && thread.get(3).contains("ConcurrentParallelGatewaySenderQueue.peek"); } - if (threadName.startsWith("FD_SOCK ClientConnectionHandler")) { - return true; - } - if (threadName.startsWith("FD_SOCK Ping thread")) { - return (stackSize <= 9 && thread.getFirstFrame().contains("socketRead")); - } - if (threadName.startsWith("FD_SOCK listener thread")) { - return (stackSize <= 9 && thread.getFirstFrame().contains("socketAccept")); - } if (threadName.startsWith("GC Daemon")) { return !thread.isRunnable() && stackSize <= 6; } @@ -368,26 +368,11 @@ public class PluckStacks { if (threadName.startsWith("TimeScheduler.Thread")) { return !thread.isRunnable() && (stackSize <= 8 && thread.getFirstFrame().contains("Object.wait")); } - if (threadName.startsWith("UDP Loopback Message Handler")) { - return !thread.isRunnable() && (stackSize <= 9 && thread.getFirstFrame().contains("Object.wait")); - } - if (threadName.startsWith("UDP ucast receiver")) { - return (stackSize == 11 && thread.getFirstFrame().contains("SocketImpl.receive")); - } - if (threadName.startsWith("VERIFY_SUSPECT")) { - return !thread.isRunnable() && (stackSize <=9 && thread.getFirstFrame().contains("Object.wait")); - } - if (threadName.startsWith("View Message Processor")) { - return isIdleExecutor(thread); - } if (threadName.startsWith("vfabric-license-heartbeat")) { if (thread.isRunnable()) return false; if (stackSize == 6 && thread.getFirstFrame().contains("Thread.sleep")) return true; return (stackSize <= 7 && thread.getFirstFrame().contains("Object.wait")); } - if (threadName.startsWith("ViewHandler")) { - return !thread.isRunnable() && (stackSize <= 8); - } if (threadName.equals("WAN Locator Discovery Thread")) { return (!thread.isRunnable() && thread.get(3).contains("exchangeRemoteLocators")); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml index 8393d31..29d2945 100755 --- a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml +++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml @@ -22,7 +22,7 @@ timer.keep_alive_time="3000" timer.queue_max_size="500" - thread_pool.enabled="true" + thread_pool.enabled="false" thread_pool.min_threads="1" thread_pool.max_threads="4" thread_pool.keep_alive_time="5000" @@ -30,7 +30,7 @@ thread_pool.queue_max_size="10000" thread_pool.rejection_policy="discard" - oob_thread_pool.enabled="true" + oob_thread_pool.enabled="false" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="4" oob_thread_pool.keep_alive_time="5000" http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml index c16fad3..e8440bc 100755 --- a/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml +++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml @@ -31,7 +31,7 @@ timer.keep_alive_time="3000" timer.queue_max_size="500" - thread_pool.enabled="true" + thread_pool.enabled="false" thread_pool.min_threads="1" thread_pool.max_threads="4" thread_pool.keep_alive_time="5000" @@ -39,7 +39,7 @@ thread_pool.queue_max_size="10000" thread_pool.rejection_policy="discard" - oob_thread_pool.enabled="true" + oob_thread_pool.enabled="false" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="4" oob_thread_pool.keep_alive_time="5000" http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java index 26e8586..6bb1543 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java @@ -80,8 +80,7 @@ public class ReconnectDUnitTest extends CacheTestCase } locatorPort = locPort; Properties props = getDistributedSystemProperties(); - props.put("log-file", "autoReconnectLocatorVM"+VM.getCurrentVMNum()+"_"+getPID()+".log"); - locator = Locator.startLocatorAndDS(locatorPort, null, props); + locator = Locator.startLocatorAndDS(locatorPort, new File(""), props); addExpectedException("com.gemstone.gemfire.ForcedDisconnectException||Possible loss of quorum"); // MembershipManagerHelper.getMembershipManager(InternalDistributedSystem.getConnectedInstance()).setDebugJGroups(true); } catch (IOException e) { @@ -118,7 +117,7 @@ public class ReconnectDUnitTest extends CacheTestCase { try { super.tearDown2(); - Host.getHost(0).getVM(3).invoke(new SerializableRunnable("stop locator") { + Host.getHost(0).getVM(locatorVMNumber).invoke(new SerializableRunnable("stop locator") { public void run() { if (locator != null) { getLogWriter().info("stopping locator " + locator); @@ -155,106 +154,6 @@ public class ReconnectDUnitTest extends CacheTestCase return factory.create(); } - /** - * (comment from Bruce: this test doesn't seem to really do anything) - * </p> - * Test reconnect with the max-time-out of 200 and max-number-of-tries - * 1. The test first creates an xml file and then use it to create - * cache and regions. The test then fires reconnect in one of the - * vms. The reconnect uses xml file to create and intialize cache. - * @throws Exception - * */ - - public void testReconnect() throws TimeoutException, CacheException, - IOException - { - final int locPort = this.locatorPort; - - final String xmlFileLoc = (new File(".")).getAbsolutePath(); - - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - - VM vm1 = host.getVM(1); - //VM vm2 = host.getVM(2); - - SerializableRunnable create1 = new CacheSerializableRunnable( - "Create Cache and Regions from cache.xml") { - public void run2() throws CacheException - { - // DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region"); - locatorPort = locPort; - Properties props = getDistributedSystemProperties(); - props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml"); - props.put("max-wait-time-reconnect", "200"); - props.put("max-num-reconnect-tries", "1"); - getLogWriter().info("test is creating distributed system"); - getSystem(props); - getLogWriter().info("test is creating cache"); - Cache cache = getCache(); - Region myRegion = cache.getRegion("root/myRegion"); - myRegion.put("MyKey1", "MyValue1"); - // myRegion.put("Mykey2", "MyValue2"); - - } - }; - - SerializableRunnable create2 = new CacheSerializableRunnable( - "Create Cache and Regions from cache.xml") { - public void run2() throws CacheException - { - // DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region"); - locatorPort = locPort; - Properties props = getDistributedSystemProperties(); - props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml"); - props.put("max-wait-time-reconnect", "200"); - props.put("max-num-reconnect-tries", "1"); - getSystem(props); - Cache cache = getCache(); - Region myRegion = cache.getRegion("root/myRegion"); - //myRegion.put("MyKey1", "MyValue1"); - myRegion.put("Mykey2", "MyValue2"); - assertNotNull(myRegion.get("MyKey1")); - //getLogWriter().fine("MyKey1 value is : "+myRegion.get("MyKey1")); - - } - }; - - vm0.invoke(create1); - vm1.invoke(create2); - - SerializableRunnable reconnect = new CacheSerializableRunnable( - "Create Region") { - public void run2() throws CacheException - { - // DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region"); - // closeCache(); - // getSystem().disconnect(); - locatorPort = locPort; - Properties props = getDistributedSystemProperties(); - props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml"); - props.put("max-wait-time-reconnect", "200"); - props.put("max-num-reconnect-tries", "1"); - getSystem(props); - Cache cache = getCache(); - //getLogWriter().fine("Cache type : "+cache.getClass().getName()); - Region reg = cache.getRegion("root/myRegion"); - //getLogWriter().fine("The reg type : "+reg); - assertNotNull(reg.get("MyKey1")); - getLogWriter().fine("MyKey1 Value after disconnect : " - + reg.get("MyKey1")); - - //closeCache(); - //disconnectFromDS(); - - } - }; - - vm1.invoke(reconnect); - - } - - // quorum check fails, then succeeds public void testReconnectWithQuorum() throws Exception { addExpectedException("killing member's ds"); @@ -326,7 +225,7 @@ public class ReconnectDUnitTest extends CacheTestCase } } - public void disabledtestReconnectOnForcedDisconnect() throws Exception { + public void testReconnectOnForcedDisconnect() throws Exception { doTestReconnectOnForcedDisconnect(false); } @@ -365,7 +264,7 @@ public class ReconnectDUnitTest extends CacheTestCase props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml"); props.put("max-wait-time-reconnect", "1000"); props.put("max-num-reconnect-tries", "2"); - props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log"); +// props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log"); Cache cache = new CacheFactory(props).create(); Region myRegion = cache.getRegion("root/myRegion"); ReconnectDUnitTest.savedSystem = cache.getDistributedSystem(); @@ -387,7 +286,7 @@ public class ReconnectDUnitTest extends CacheTestCase props.put("max-num-reconnect-tries", "2"); props.put("start-locator", "localhost["+secondLocPort+"]"); props.put("locators", props.get("locators")+",localhost["+secondLocPort+"]"); - props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log"); +// props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log"); getSystem(props); // MembershipManagerHelper.getMembershipManager(system).setDebugJGroups(true); final Cache cache = getCache(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java deleted file mode 100644 index cf95817..0000000 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java +++ /dev/null @@ -1,463 +0,0 @@ -package com.gemstone.gemfire.distributed.internal.membership.gms.membership; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.distributed.internal.DistributionManager; -import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; -import com.gemstone.gemfire.distributed.internal.membership.NetView; -import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; -import com.gemstone.gemfire.distributed.internal.membership.gms.Services; -import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper; -import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor; -import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckResponseMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest; -import com.gemstone.gemfire.internal.SocketCreator; -import com.gemstone.gemfire.test.junit.categories.UnitTest; - -@Category(UnitTest.class) -public class GMSHealthMonitorJUnitTest { - - private Services services; - private ServiceConfig mockConfig; - private DistributionConfig mockDistConfig; - private List<InternalDistributedMember> mockMembers; - private Messenger messenger; - private GMSJoinLeave joinLeave; - private GMSHealthMonitor gmsHealthMonitor; - final long memberTimeout = 1000l; - private int[] portRange= new int[]{0, 65535}; - - @Before - public void initMocks() throws UnknownHostException { - System.setProperty("gemfire.bind-address", "localhost"); - mockDistConfig = mock(DistributionConfig.class); - mockConfig = mock(ServiceConfig.class); - messenger = mock(Messenger.class); - joinLeave = mock(GMSJoinLeave.class); - services = mock(Services.class); - Stopper stopper = mock(Stopper.class); - - when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig); - when(mockConfig.getMemberTimeout()).thenReturn(memberTimeout); - when(mockConfig.getMembershipPortRange()).thenReturn(portRange); - when(services.getConfig()).thenReturn(mockConfig); - when(services.getMessenger()).thenReturn(messenger); - when(services.getJoinLeave()).thenReturn(joinLeave); - when(services.getCancelCriterion()).thenReturn(stopper); - when(stopper.isCancelInProgress()).thenReturn(false); - - - if (mockMembers == null) { - mockMembers = new ArrayList<InternalDistributedMember>(); - for (int i = 0; i < 7; i++) { - InternalDistributedMember mbr = new InternalDistributedMember("localhost", 8888 + i); - - if (i == 0 || i == 1) { - mbr.setVmKind(DistributionManager.LOCATOR_DM_TYPE); - mbr.getNetMember().setPreferredForCoordinator(true); - } - mockMembers.add(mbr); - } - } - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); - gmsHealthMonitor = new GMSHealthMonitor(); - gmsHealthMonitor.init(services); - gmsHealthMonitor.start(); - } - - @After - public void tearDown() { - gmsHealthMonitor.stop(); - } - - @Test - public void testHMServiceStarted() throws IOException { - - MethodExecuted messageSent = new MethodExecuted(); - InternalDistributedMember mbr = new InternalDistributedMember(SocketCreator.getLocalHost(), 12345); - when(messenger.getMemberID()).thenReturn(mbr); - when(messenger.send(any(CheckResponseMessage.class))).thenAnswer(messageSent); - gmsHealthMonitor.started(); - - gmsHealthMonitor.processMessage(new CheckRequestMessage(mbr, 1)); - Assert.assertTrue("Check Response should have been sent", messageSent.isMethodExecuted()); - } - - /** - * checks whether we get local member id or not to set next neighbor - */ - @Test - public void testHMNextNeighbor() throws IOException { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - when(services.getMessenger().getMemberID()).thenAnswer(messageSent); - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - Assert.assertTrue("It should have got memberID from services.getMessenger().getMemberID()", messageSent.isMethodExecuted()); - } - - /** - * checks who is next neighbor - */ - @Test - public void testHMNextNeighborVerify() throws IOException { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor()); - - } - - @Test - public void testHMNextNeighborAfterTimeout() throws Exception { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - -// System.out.printf("memberID is %s view is %s\n", mockMembers.get(3), v); - - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - // allow the monitor to give up on the initial "next neighbor" and - // move on to the one after it - long giveup = System.currentTimeMillis() + memberTimeout + 5; - InternalDistributedMember expected = mockMembers.get(5); - InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor(); - while (System.currentTimeMillis() < giveup && neighbor != expected) { - Thread.sleep(5); - neighbor = gmsHealthMonitor.getNextNeighbor(); - } - - // neighbor should change to 5th - Assert.assertEquals("expected " + mockMembers.get(5) + " but found " + neighbor - + ". view="+v, mockMembers.get(5), neighbor); - } - - /** - * it checks neighbor before membertiemout, it should be same - */ - - @Test - public void testHMNextNeighborBeforeTimeout() throws IOException { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - try { - // member-timeout is 1000 ms, so next neighbor should be same - Thread.sleep(memberTimeout - 200); - } catch (InterruptedException e) { - } - // neighbor should be same - Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor()); - } - - /*** - * checks whether member-check thread sends suspectMembers message - */ - @Test - public void testSuspectMembersCalledThroughMemberCheckThread() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent); - - try { - // member-timeout is 1000 ms + ping timeout 100ms - // plus wait 100 ms for ack - Thread.sleep(memberTimeout + 100); - } catch (InterruptedException e) { - } - - Assert.assertTrue("SuspectMembersMessage should have sent", messageSent.isMethodExecuted()); - } - - /*** - * checks ping thread didn't sends suspectMembers message before timeout - */ - @Test - public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - - gmsHealthMonitor.installView(v); - - when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent); - - try { - // member-timeout is 1000 ms - // plus 100 ms for ack - Thread.sleep(memberTimeout - 200); - } catch (InterruptedException e) { - } - - Assert.assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted()); - } - - /*** - * Checks whether suspect thread sends suspectMembers message - */ - @Test - public void testSuspectMembersCalledThroughSuspectThread() throws Exception { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - - gmsHealthMonitor.installView(v); - - gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding"); - - when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent); - - Thread.sleep(GMSHealthMonitor.MEMBER_SUSPECT_COLLECTION_INTERVAL + 1000); - - Assert.assertTrue("SuspectMembersMessage should have sent", messageSent.isMethodExecuted()); - } - - /*** - * Checks suspect thread doesn't sends suspectMembers message before timeout - */ - @Test - public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - - gmsHealthMonitor.installView(v); - - gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding"); - - when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent); - - try { - // suspect thread timeout is 200 ms - Thread.sleep(100l); - } catch (InterruptedException e) { - } - - Assert.assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted()); - } - - /*** - * Send remove member message after doing final check, ping Timeout - */ - @Test - public void testRemoveMemberCalled() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>(); - recipient.add(mockMembers.get(0)); - ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>(); - SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1 - as.add(sr); - SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as); - sm.setSender(mockMembers.get(0)); - - when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(messageSent); - - gmsHealthMonitor.processMessage(sm); - - try { - // this happens after final check, ping timeout - Thread.sleep(memberTimeout); - } catch (InterruptedException e) { - } - - Assert.assertTrue("RemoveMemberMessage should have sent", messageSent.isMethodExecuted()); - } - - /*** - * Shouldn't send remove member message before doing final check, or before ping Timeout - */ - @Test - public void testRemoveMemberNotCalledBeforeTimeout() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>(); - recipient.add(mockMembers.get(0)); - ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>(); - SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1 - as.add(sr); - SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as); - sm.setSender(mockMembers.get(0)); - - when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(messageSent); - - gmsHealthMonitor.processMessage(sm); - - try { - // this happens after final check, ping timeout - Thread.sleep(memberTimeout); - } catch (InterruptedException e) { - } - - Assert.assertTrue("RemoveMemberMessage should have sent", messageSent.isMethodExecuted()); - } - - /*** - * Send remove member message after doing final check for coordinator, ping timeout - * This test trying to remove coordinator - */ - @Test - public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // preferred coordinators are 0 and 1 - when(messenger.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator - gmsHealthMonitor.started(); - - gmsHealthMonitor.installView(v); - - ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>(); - recipient.add(mockMembers.get(0)); - recipient.add(mockMembers.get(1)); - ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>(); - SuspectRequest sr = new SuspectRequest(mockMembers.get(0), "Not Responding");// removing coordinator - as.add(sr); - SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as); - sm.setSender(mockMembers.get(4));// member 4 sends suspect message - - when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(messageSent);// member 1 will process - - gmsHealthMonitor.processMessage(sm); - - try { - // this happens after final check, ping timeout = 1000 ms - Thread.sleep(memberTimeout); - } catch (InterruptedException e) { - } - - Assert.assertTrue("RemoveMemberMessage should have sent.", messageSent.isMethodExecuted()); - } - - /*** - * validates HealthMonitor.CheckIfAvailable api - */ - @Test - public void testCheckIfAvailable() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - - gmsHealthMonitor.installView(v); - - long startTime = System.currentTimeMillis(); - - boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false); - - long timeTaken = System.currentTimeMillis() - startTime; - - Assert.assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90); - Assert.assertTrue("CheckIfAvailable should have return false", !retVal); - } - - @Test - public void testShutdown() { - - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); - - MethodExecuted messageSent = new MethodExecuted(); - // 3rd is current member - when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); - - gmsHealthMonitor.installView(v); - - gmsHealthMonitor.stop(); - - try { - // this happens after final check, membertimeout = 1000 - Thread.sleep(100l); - } catch (InterruptedException e) { - } - - Assert.assertTrue("HeathMonitor should have shutdown", gmsHealthMonitor.isShutdown()); - - } - - private class MethodExecuted implements Answer { - private boolean methodExecuted = false; - - public boolean isMethodExecuted() { - return methodExecuted; - } - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - methodExecuted = true; - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java index 8dee00a..89282c0 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java @@ -90,6 +90,7 @@ public class GMSJoinLeaveJUnitTest { manager = mock(Manager.class); healthMonitor = mock(HealthMonitor.class); + when(healthMonitor.getFailureDetectionPort()).thenReturn(Integer.valueOf(-1)); services = mock(Services.class); when(services.getAuthenticator()).thenReturn(authenticator); @@ -241,6 +242,7 @@ public class GMSJoinLeaveJUnitTest { // simultaneous leave & remove requests for a member // should not result in it's being seen as a crashed member initMocks(); + final int viewInstallationTime = 15000; when(healthMonitor.checkIfAvailable(any(InternalDistributedMember.class), any(String.class), any(Boolean.class))).thenReturn(true); @@ -249,7 +251,7 @@ public class GMSJoinLeaveJUnitTest { gmsJoinLeave.becomeCoordinatorForTest(); NetView oldView = null; - long giveup = System.currentTimeMillis() + 10000; + long giveup = System.currentTimeMillis() + viewInstallationTime; while (System.currentTimeMillis() < giveup && oldView == null) { Thread.sleep(500); oldView = gmsJoinLeave.getView(); @@ -264,7 +266,7 @@ public class GMSJoinLeaveJUnitTest { gmsJoinLeave.memberShutdown(mockMembers[1], "shutting down for test"); gmsJoinLeave.remove(mockMembers[1], "removing for test"); - giveup = System.currentTimeMillis() + 10000; + giveup = System.currentTimeMillis() + viewInstallationTime; while (System.currentTimeMillis() < giveup && gmsJoinLeave.getView().getViewId() == newView.getViewId()) { Thread.sleep(500); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java new file mode 100644 index 0000000..1f0ccf0 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/fd/GMSHealthMonitorJUnitTest.java @@ -0,0 +1,437 @@ +package com.gemstone.gemfire.distributed.internal.membership.gms.membership.fd; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.NetView; +import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper; +import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger; +import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest; +import com.gemstone.gemfire.internal.SocketCreator; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class GMSHealthMonitorJUnitTest { + + private Services services; + private ServiceConfig mockConfig; + private DistributionConfig mockDistConfig; + private List<InternalDistributedMember> mockMembers; + private Messenger messenger; + private GMSJoinLeave joinLeave; + private GMSHealthMonitor gmsHealthMonitor; + final long memberTimeout = 1000l; + private int[] portRange= new int[]{0, 65535}; + + @Before + public void initMocks() throws UnknownHostException { + System.setProperty("gemfire.bind-address", "localhost"); + mockDistConfig = mock(DistributionConfig.class); + mockConfig = mock(ServiceConfig.class); + messenger = mock(Messenger.class); + joinLeave = mock(GMSJoinLeave.class); + services = mock(Services.class); + Stopper stopper = mock(Stopper.class); + + when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig); + when(mockConfig.getMemberTimeout()).thenReturn(memberTimeout); + when(mockConfig.getMembershipPortRange()).thenReturn(portRange); + when(services.getConfig()).thenReturn(mockConfig); + when(services.getMessenger()).thenReturn(messenger); + when(services.getJoinLeave()).thenReturn(joinLeave); + when(services.getCancelCriterion()).thenReturn(stopper); + when(stopper.isCancelInProgress()).thenReturn(false); + + + if (mockMembers == null) { + mockMembers = new ArrayList<InternalDistributedMember>(); + for (int i = 0; i < 7; i++) { + InternalDistributedMember mbr = new InternalDistributedMember("localhost", 8888 + i); + + if (i == 0 || i == 1) { + mbr.setVmKind(DistributionManager.LOCATOR_DM_TYPE); + mbr.getNetMember().setPreferredForCoordinator(true); + } + mockMembers.add(mbr); + } + } + when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor = new GMSHealthMonitor(); + gmsHealthMonitor.init(services); + gmsHealthMonitor.start(); + } + + @After + public void tearDown() { + gmsHealthMonitor.stop(); + } + + @Test + public void testHMServiceStarted() throws IOException { + + InternalDistributedMember mbr = new InternalDistributedMember(SocketCreator.getLocalHost(), 12345); + when(messenger.getMemberID()).thenReturn(mbr); + gmsHealthMonitor.started(); + + gmsHealthMonitor.processMessage(new HeartbeatRequestMessage(mbr, 1)); + verify(messenger, atLeastOnce()).send(any(HeartbeatMessage.class)); + } + + /** + * checks who is next neighbor + */ + @Test + public void testHMNextNeighborVerify() throws IOException { + + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor()); + + } + + @Test + public void testHMNextNeighborAfterTimeout() throws Exception { + System.out.println("testHMNextNeighborAfterTimeout starting"); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + +// System.out.printf("memberID is %s view is %s\n", mockMembers.get(3), v); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + // allow the monitor to give up on the initial "next neighbor" and + // move on to the one after it + long giveup = System.currentTimeMillis() + memberTimeout + 500; + InternalDistributedMember expected = mockMembers.get(5); + InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor(); + while (System.currentTimeMillis() < giveup && neighbor != expected) { + Thread.sleep(5); + neighbor = gmsHealthMonitor.getNextNeighbor(); + } + + // neighbor should change to 5th + System.out.println("testHMNextNeighborAfterTimeout ending"); + Assert.assertEquals("expected " + expected + " but found " + neighbor + + ". view="+v, expected, neighbor); + } + + /** + * it checks neighbor before member-timeout, it should be same + */ + + @Test + public void testHMNextNeighborBeforeTimeout() throws IOException { + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + try { + // member-timeout is 1000 ms. We initiate a check and choose + // a new neighbor at 500 ms + Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL - 100); + } catch (InterruptedException e) { + } + // neighbor should be same + System.out.println("next neighbor is " + gmsHealthMonitor.getNextNeighbor() + + "\nmy address is " + mockMembers.get(3) + + "\nview is " + v); + + Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbor()); + } + + /*** + * checks whether member-check thread sends suspectMembers message + */ + @Test + public void testSuspectMembersCalledThroughMemberCheckThread() throws Exception { + System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting"); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + // when the view is installed we start a heartbeat timeout. After + // that expires we request a heartbeat + Thread.sleep(3*memberTimeout + 100); + + System.out.println("testSuspectMembersCalledThroughMemberCheckThread ending"); + assertTrue(gmsHealthMonitor.isSuspectMember(mockMembers.get(4))); + } + + /*** + * checks ping thread didn't sends suspectMembers message before timeout + */ + @Test + public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() { + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor(); + + try { + // member-timeout is 1000 ms + // plus 100 ms for ack + Thread.sleep(memberTimeout - 200); + } catch (InterruptedException e) { + } + + assertFalse(gmsHealthMonitor.isSuspectMember(neighbor)); + } + + /*** + * Checks whether suspect thread sends suspectMembers message + */ + @Test + public void testSuspectMembersCalledThroughSuspectThread() throws Exception { + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + + gmsHealthMonitor.installView(v); + + gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding"); + + Thread.sleep(GMSHealthMonitor.MEMBER_SUSPECT_COLLECTION_INTERVAL + 1000); + + verify(messenger, atLeastOnce()).send(any(SuspectMembersMessage.class)); + } + + /*** + * Checks suspect thread doesn't sends suspectMembers message before timeout + */ + @Test + public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() { + + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + MethodExecuted messageSent = new MethodExecuted(); + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + + gmsHealthMonitor.installView(v); + + gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding"); + + when(messenger.send(isA(SuspectMembersMessage.class))).thenAnswer(messageSent); + + try { + // suspect thread timeout is 200 ms + Thread.sleep(100l); + } catch (InterruptedException e) { + } + + assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted()); + } + + /*** + * Send remove member message after doing final check, ping Timeout + */ + @Test + public void testRemoveMemberCalled() throws Exception { + System.out.println("testRemoveMemberCalled starting"); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL); + + ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>(); + recipient.add(mockMembers.get(0)); + ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>(); + SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1 + as.add(sr); + SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as); + sm.setSender(mockMembers.get(0)); + + gmsHealthMonitor.processMessage(sm); + + Thread.sleep(2*memberTimeout + 200); + + System.out.println("testRemoveMemberCalled ending"); + verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class)); + } + + /*** + * Shouldn't send remove member message before doing final check, or before ping Timeout + */ + @Test + public void testRemoveMemberNotCalledBeforeTimeout() { + System.out.println("testRemoveMemberNotCalledBeforeTimeout starting"); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>(); + recipient.add(mockMembers.get(0)); + ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>(); + SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1 + as.add(sr); + SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as); + sm.setSender(mockMembers.get(0)); + + gmsHealthMonitor.processMessage(sm); + + try { + // this happens after final check, ping timeout + Thread.sleep(memberTimeout-100); + } catch (InterruptedException e) { + } + + System.out.println("testRemoveMemberNotCalledBeforeTimeout ending"); + verify(joinLeave, never()).remove(any(InternalDistributedMember.class), any(String.class)); + } + + /*** + * Send remove member message after doing final check for coordinator, ping timeout + * This test trying to remove coordinator + */ + @Test + public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() throws Exception { + + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // preferred coordinators are 0 and 1 + when(messenger.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator + gmsHealthMonitor.started(); + + gmsHealthMonitor.installView(v); + + Thread.sleep(memberTimeout/GMSHealthMonitor.LOGICAL_INTERVAL); + + ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>(); + recipient.add(mockMembers.get(0)); + recipient.add(mockMembers.get(1)); + ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>(); + SuspectRequest sr = new SuspectRequest(mockMembers.get(0), "Not Responding");// removing coordinator + as.add(sr); + SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as); + sm.setSender(mockMembers.get(4));// member 4 sends suspect message + + gmsHealthMonitor.processMessage(sm); + + // this happens after final check, ping timeout = 1000 ms + Thread.sleep(memberTimeout + 200); + + verify(joinLeave, atLeastOnce()).remove(any(InternalDistributedMember.class), any(String.class)); + } + + /*** + * validates HealthMonitor.CheckIfAvailable api + */ + @Test + public void testCheckIfAvailable() { + + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + + gmsHealthMonitor.installView(v); + + long startTime = System.currentTimeMillis(); + + boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false); + + long timeTaken = System.currentTimeMillis() - startTime; + + assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90); + assertTrue("CheckIfAvailable should have return false", !retVal); + } + + @Test + public void testShutdown() { + + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>()); + + // 3rd is current member + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + + gmsHealthMonitor.installView(v); + + gmsHealthMonitor.stop(); + + try { + // this happens after final check, membertimeout = 1000 + Thread.sleep(100l); + } catch (InterruptedException e) { + } + + assertTrue("HeathMonitor should have shutdown", gmsHealthMonitor.isShutdown()); + + } + + private class MethodExecuted implements Answer { + private boolean methodExecuted = false; + + public boolean isMethodExecuted() { + return methodExecuted; + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + methodExecuted = true; + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java index dfd7779..bf469b8 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -245,59 +245,6 @@ public class JGroupsMessengerJUnitTest { System.out.println("received message = " + messageReceived[0]); } - @Test - public void testDefragmentation() throws Exception { - initMocks(false); - MessageHandler mh = mock(MessageHandler.class); - messenger.addHandler(JoinRequestMessage.class, mh); - - InternalDistributedMember sender = messenger.getMemberID(); - NetView v = new NetView(sender); - when(joinLeave.getView()).thenReturn(v); - messenger.installView(v); - - // configure an incoming message handler for JoinRequestMessage - final DistributionMessage[] messageReceived = new DistributionMessage[1]; - MessageHandler handler = new MessageHandler() { - @Override - public void processMessage(DistributionMessage m) { - messageReceived[0] = m; - } - }; - messenger.addHandler(JoinRequestMessage.class, handler); - - // configure the outgoing message interceptor - interceptor.unicastSentDataMessages = 0; - interceptor.collectMessages = true; - interceptor.collectedMessages.clear(); - - JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1); - messenger.send(msg); - - assertTrue("expected 2 messages to be sent but found "+ interceptor.unicastSentDataMessages, - interceptor.unicastSentDataMessages == 2); - - // take the fragments and mess with them so they are coming from a new - // "fakeMember", feeding them back up the JGroups stack so that the messenger - // will receive them - List<Message> messages = new ArrayList<>(interceptor.collectedMessages); - UUID fakeMember = new UUID(50, 50); - short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class); - int seqno = 1; - for (Message m: messages) { - m.setSrc(fakeMember); - UNICAST3.Header oldHeader = (UNICAST3.Header)m.getHeader(unicastHeaderId); - if (oldHeader == null) continue; - UNICAST3.Header newHeader = UNICAST3.Header.createDataHeader(seqno, oldHeader.connId(), seqno==1); - seqno += 1; - m.putHeader(unicastHeaderId, newHeader); - interceptor.up(new Event(Event.MSG, m)); - } - Thread.sleep(5000); - System.out.println("received message = " + messageReceived[0]); - } - - @Test public void testSendToMultipleMembers() throws Exception { initMocks(false); @@ -438,22 +385,6 @@ public class JGroupsMessengerJUnitTest { assertFalse(messenger.myChannel.isConnected()); } - /** - * Test whether DistributionMessage.isPreciousThread() recognizes - * that a UDP transport thread is "precious" - * @throws Exception - */ - @Test - public void testPreciousThread() throws Exception { - String name = Thread.currentThread().getName(); - try { - Thread.currentThread().setName(Transport.PRECIOUS_THREAD_NAME_PREFIX + " test thread"); - assertTrue(DistributionMessage.isPreciousThread()); - } finally { - Thread.currentThread().setName(name); - } - } - @Test public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect() throws Exception { initMocks(false);
