http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java index fbdcdf5..4b9c01f 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -16,26 +16,22 @@ */ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; + import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import junit.framework.Assert; - +import org.apache.commons.lang.SerializationException; import org.jgroups.Event; +import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.conf.ClassConfigurator; import org.jgroups.protocols.UNICAST3; @@ -43,9 +39,10 @@ import org.jgroups.util.UUID; import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import com.gemstone.gemfire.ForcedDisconnectException; +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; @@ -58,15 +55,21 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember; import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger.JGroupsReceiver; import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.DataSerializableFixedID; import com.gemstone.gemfire.internal.HeapDataOutputStream; +import com.gemstone.gemfire.internal.SocketCreator; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; +import com.gemstone.gemfire.internal.cache.DistributedCacheOperation; import com.gemstone.gemfire.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -76,6 +79,7 @@ public class JGroupsMessengerJUnitTest { private JoinLeave joinLeave; private Manager manager; private Stopper stopper; + private HealthMonitor healthMonitor; private InterceptUDP interceptor; @@ -83,6 +87,10 @@ public class JGroupsMessengerJUnitTest { * Create stub and mock objects */ private void initMocks(boolean enableMcast) throws Exception { + if (messenger != null) { + messenger.stop(); + messenger = null; + } Properties nonDefault = new Properties(); nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true"); nonDefault.put(DistributionConfig.MCAST_PORT_NAME, enableMcast? ""+AvailablePortHelper.getRandomAvailableUDPPort() : "0"); @@ -100,6 +108,8 @@ public class JGroupsMessengerJUnitTest { manager = mock(Manager.class); when(manager.isMulticastAllowed()).thenReturn(enableMcast); + healthMonitor = mock(HealthMonitor.class); + joinLeave = mock(JoinLeave.class); ServiceConfig serviceConfig = new ServiceConfig(tconfig, config); @@ -107,6 +117,7 @@ public class JGroupsMessengerJUnitTest { services = mock(Services.class); when(services.getConfig()).thenReturn(serviceConfig); when(services.getCancelCriterion()).thenReturn(stopper); + when(services.getHealthMonitor()).thenReturn(healthMonitor); when(services.getManager()).thenReturn(manager); when(services.getJoinLeave()).thenReturn(joinLeave); when(services.getStatistics()).thenReturn(mock(DMStats.class)); @@ -121,7 +132,7 @@ public class JGroupsMessengerJUnitTest { "<"+InterceptUDP.class.getName()+"/>" + jgroupsConfig.substring(insertIdx); messenger.setJGroupsStackConfigForTesting(jgroupsConfig); - System.out.println("jgroups config: " + jgroupsConfig); +// System.out.println("jgroups config: " + jgroupsConfig); messenger.start(); messenger.started(); @@ -141,13 +152,195 @@ public class JGroupsMessengerJUnitTest { @Test public void testMemberWeightIsSerialized() throws Exception { HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT); - InternalDistributedMember m = new InternalDistributedMember("localhost", 8888); - ((GMSMember)m.getNetMember()).setMemberWeight((byte)40); - m.toData(out); + InternalDistributedMember mbr = createAddress(8888); + ((GMSMember)mbr.getNetMember()).setMemberWeight((byte)40); + mbr.toData(out); DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray())); - m = new InternalDistributedMember(); - m.fromData(in); - assertEquals(40, m.getNetMember().getMemberWeight()); + mbr = new InternalDistributedMember(); + mbr.fromData(in); + assertEquals(40, mbr.getNetMember().getMemberWeight()); + } + + @Test + public void testSerializationError() throws Exception { + for (int i=0; i<2 ; i++) { + boolean enableMcast = (i==1); + initMocks(enableMcast); + InternalDistributedMember mbr = createAddress(8888); + DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class); + when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr}); + when(msg.getMulticast()).thenReturn(enableMcast); + if (!enableMcast) { + // for non-mcast we send a message with a reply-processor + when(msg.getProcessorId()).thenReturn(1234); + } else { + // for mcast we send a direct-ack message and expect the messenger + // to register it + stub(msg.isDirectAck()).toReturn(true); + } + when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE); + + // for code coverage we need to test with both a SerializationException and + // an IOException. The former is wrapped in a GemfireIOException while the + // latter is not + doThrow(new SerializationException()).when(msg).toData(any(DataOutput.class)); + try { + messenger.send(msg); + fail("expected a failure"); + } catch (GemFireIOException e) { + // success + } + if (enableMcast) { + verify(msg, atLeastOnce()).registerProcessor(); + } + doThrow(new IOException()).when(msg).toData(any(DataOutput.class)); + try { + messenger.send(msg); + fail("expected a failure"); + } catch (GemFireIOException e) { + // success + } + } + } + + @Test + public void testJChannelError() throws Exception { + for (int i=0; i<2 ; i++) { + boolean enableMcast = (i==1); + initMocks(enableMcast); + JChannel mockChannel = mock(JChannel.class); + when(mockChannel.isConnected()).thenReturn(true); + doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class)); + JChannel realChannel = messenger.myChannel; + messenger.myChannel = mockChannel; + try { + InternalDistributedMember mbr = createAddress(8888); + DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class); + when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr}); + when(msg.getMulticast()).thenReturn(enableMcast); + when(msg.getProcessorId()).thenReturn(1234); + when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE); + try { + messenger.send(msg); + fail("expected a failure"); + } catch (DistributedSystemDisconnectedException e) { + // success + } + verify(mockChannel).send(isA(Message.class)); + } finally { + messenger.myChannel = realChannel; + } + } + } + + @Test + public void testJChannelErrorDuringDisconnect() throws Exception { + for (int i=0; i<4 ; i++) { + System.out.println("loop #"+i); + boolean enableMcast = (i%2 == 1); + initMocks(enableMcast); + JChannel mockChannel = mock(JChannel.class); + when(mockChannel.isConnected()).thenReturn(true); + Exception ex, shutdownCause; + if (i < 2) { + ex = new RuntimeException(""); + shutdownCause = new RuntimeException("shutdownCause"); + } else { + shutdownCause = new ForcedDisconnectException(""); + ex = new RuntimeException("", shutdownCause); + } + doThrow(ex).when(mockChannel).send(any(Message.class)); + JChannel realChannel = messenger.myChannel; + messenger.myChannel = mockChannel; + + when(services.getShutdownCause()).thenReturn(shutdownCause); + + try { + InternalDistributedMember mbr = createAddress(8888); + DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class); + when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr}); + when(msg.getMulticast()).thenReturn(enableMcast); + when(msg.getProcessorId()).thenReturn(1234); + when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE); + try { + messenger.send(msg); + fail("expected a failure"); + } catch (DistributedSystemDisconnectedException e) { + // the ultimate cause should be the shutdownCause returned + // by Services.getShutdownCause() + Throwable cause = e; + while (cause.getCause() != null) { + cause = cause.getCause(); + } + assertTrue(cause != e); + assertTrue(cause == shutdownCause); + } + verify(mockChannel).send(isA(Message.class)); + } finally { + messenger.myChannel = realChannel; + } + } + } + + @Test + public void testSendWhenChannelIsClosed() throws Exception { + for (int i=0; i<2 ; i++) { + initMocks(false); + JChannel mockChannel = mock(JChannel.class); + when(mockChannel.isConnected()).thenReturn(false); + doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class)); + JChannel realChannel = messenger.myChannel; + messenger.myChannel = mockChannel; + try { + InternalDistributedMember mbr = createAddress(8888); + DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class); + when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr}); + when(msg.getMulticast()).thenReturn(false); + when(msg.getProcessorId()).thenReturn(1234); + try { + messenger.send(msg); + fail("expected a failure"); + } catch (DistributedSystemDisconnectedException e) { + // success + } + verify(mockChannel, never()).send(isA(Message.class)); + } finally { + messenger.myChannel = realChannel; + } + } + } + + @Test + public void testSendUnreliably() throws Exception { + for (int i=0; i<2 ; i++) { + boolean enableMcast = (i==1); + initMocks(enableMcast); + InternalDistributedMember mbr = createAddress(8888); + DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class); + when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr}); + when(msg.getMulticast()).thenReturn(enableMcast); + if (!enableMcast) { + // for non-mcast we send a message with a reply-processor + when(msg.getProcessorId()).thenReturn(1234); + } else { + // for mcast we send a direct-ack message and expect the messenger + // to register it + stub(msg.isDirectAck()).toReturn(true); + } + when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE); + interceptor.collectMessages = true; + try { + messenger.sendUnreliably(msg); + } catch (GemFireIOException e) { + fail("expected success"); + } + if (enableMcast) { + verify(msg, atLeastOnce()).registerProcessor(); + } + verify(msg).toData(isA(DataOutput.class)); + assertTrue("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size() == 1); + assertTrue(interceptor.collectedMessages.get(0).isFlagSet(Message.Flag.NO_RELIABILITY)); + } } @Test @@ -265,7 +458,7 @@ public class JGroupsMessengerJUnitTest { public void testSendToMultipleMembers() throws Exception { initMocks(false); InternalDistributedMember sender = messenger.getMemberID(); - InternalDistributedMember other = new InternalDistributedMember("localhost", 8888); + InternalDistributedMember other = createAddress(8888); NetView v = new NetView(sender); v.add(other); @@ -285,11 +478,11 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelStillConnectedAfterEmergencyCloseAfterForcedDisconnectWithAutoReconnect() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect(); - Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doCallRealMethod().when(services).isShutdownDueToForcedDisconnect(); + doCallRealMethod().when(services).isAutoReconnectEnabled(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.emergencyClose(); @@ -299,11 +492,11 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelStillConnectedAfterStopAfterForcedDisconnectWithAutoReconnect() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect(); - Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doCallRealMethod().when(services).isShutdownDueToForcedDisconnect(); + doCallRealMethod().when(services).isAutoReconnectEnabled(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.stop(); @@ -313,12 +506,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelStillConnectedAfteremergencyWhileReconnectingDS() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(false).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(true).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(false).when(services).isShutdownDueToForcedDisconnect(); + doReturn(false).when(services).isAutoReconnectEnabled(); + doReturn(true).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.emergencyClose(); @@ -329,12 +522,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelStillConnectedAfterStopWhileReconnectingDS() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(false).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(true).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(false).when(services).isShutdownDueToForcedDisconnect(); + doReturn(false).when(services).isAutoReconnectEnabled(); + doReturn(true).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.stop(); @@ -344,12 +537,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelClosedOnEmergencyClose() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(false).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(false).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(false).when(services).isShutdownDueToForcedDisconnect(); + doReturn(false).when(services).isAutoReconnectEnabled(); + doReturn(false).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.emergencyClose(); @@ -359,12 +552,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelClosedOnStop() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(false).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(false).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(false).when(services).isShutdownDueToForcedDisconnect(); + doReturn(false).when(services).isAutoReconnectEnabled(); + doReturn(false).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.stop(); @@ -374,12 +567,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelClosedAfterEmergencyCloseForcedDisconnectWithoutAutoReconnect() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(false).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(false).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(true).when(services).isShutdownDueToForcedDisconnect(); + doReturn(false).when(services).isAutoReconnectEnabled(); + doReturn(false).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.emergencyClose(); @@ -389,12 +582,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelStillConnectedStopAfterForcedDisconnectWithoutAutoReconnect() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(false).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(false).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(true).when(services).isShutdownDueToForcedDisconnect(); + doReturn(false).when(services).isAutoReconnectEnabled(); + doReturn(false).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.stop(); @@ -404,12 +597,12 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(true).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(false).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(false).when(services).isShutdownDueToForcedDisconnect(); + doReturn(true).when(services).isAutoReconnectEnabled(); + doReturn(false).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.emergencyClose(); @@ -419,18 +612,150 @@ public class JGroupsMessengerJUnitTest { @Test public void testChannelStillConnectedStopNotForcedDisconnectWithAutoReconnect() throws Exception { initMocks(false); - Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); - Mockito.doCallRealMethod().when(services).getShutdownCause(); - Mockito.doCallRealMethod().when(services).emergencyClose(); - Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect(); - Mockito.doReturn(true).when(services).isAutoReconnectEnabled(); - Mockito.doReturn(false).when(manager).isReconnectingDS(); + doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class)); + doCallRealMethod().when(services).getShutdownCause(); + doCallRealMethod().when(services).emergencyClose(); + doReturn(false).when(services).isShutdownDueToForcedDisconnect(); + doReturn(true).when(services).isAutoReconnectEnabled(); + doReturn(false).when(manager).isReconnectingDS(); services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect")); assertTrue(messenger.myChannel.isConnected()); messenger.stop(); assertFalse(messenger.myChannel.isConnected()); } + @Test + public void testMessageFiltering() throws Exception { + initMocks(true); + InternalDistributedMember mbr = createAddress(8888); + NetView view = new NetView(mbr); + + // the digest should be set in an outgoing join response + JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view); + messenger.filterOutgoingMessage(joinResponse); + assertNotNull(joinResponse.getMessengerData()); + + // save the view digest for later + byte[] data = joinResponse.getMessengerData(); + + // the digest should be used and the message bytes nulled out in an incoming join response + messenger.filterIncomingMessage(joinResponse); + assertNull(joinResponse.getMessengerData()); + + // the digest shouldn't be set in an outgoing rejection message + joinResponse = new JoinResponseMessage("you can't join my distributed system. nyah nyah nyah!"); + messenger.filterOutgoingMessage(joinResponse); + assertNull(joinResponse.getMessengerData()); + + // the digest shouldn't be installed from an incoming rejection message + joinResponse.setMessengerData(data); + messenger.filterIncomingMessage(joinResponse); + assertNotNull(joinResponse.getMessengerData()); + } + + @Test + public void testPingPong() throws Exception { + initMocks(false); + GMSPingPonger pinger = messenger.getPingPonger(); + InternalDistributedMember mbr = createAddress(8888); + JGAddress addr = new JGAddress(mbr); + + Message pingMessage = pinger.createPingMessage(null, addr); + assertTrue(pinger.isPingMessage(pingMessage.getBuffer())); + assertFalse(pinger.isPongMessage(pingMessage.getBuffer())); + + Message pongMessage = pinger.createPongMessage(null, addr); + assertTrue(pinger.isPongMessage(pongMessage.getBuffer())); + assertFalse(pinger.isPingMessage(pongMessage.getBuffer())); + + interceptor.collectMessages = true; + pinger.sendPingMessage(messenger.myChannel, null, addr); + assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1); + pingMessage = interceptor.collectedMessages.get(0); + assertTrue(pinger.isPingMessage(pingMessage.getBuffer())); + + interceptor.collectedMessages.clear(); + pinger.sendPongMessage(messenger.myChannel, null, addr); + assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1); + pongMessage = interceptor.collectedMessages.get(0); + assertTrue(pinger.isPongMessage(pongMessage.getBuffer())); + + interceptor.collectedMessages.clear(); + JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver(); + long pongsReceived = messenger.pongsReceived; + receiver.receive(pongMessage); + assertEquals(pongsReceived+1, messenger.pongsReceived); + receiver.receive(pingMessage); + assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1); + Message m = interceptor.collectedMessages.get(0); + assertTrue(pinger.isPongMessage(m.getBuffer())); + } + + @Test + public void testJGroupsIOExceptionHandler() throws Exception { + initMocks(false); + InternalDistributedMember mbr = createAddress(8888); + NetView v = new NetView(mbr); + v.add(messenger.getMemberID()); + messenger.installView(v); + + IOException ioe = new IOException("test exception"); + messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); + messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); // should be ignored + verify(healthMonitor).checkIfAvailable(mbr, "Unable to send messages to this member via JGroups", true); + } + + @Test + public void testReceiver() throws Exception { + initMocks(false); + JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver(); + + // a zero-length message is ignored + Message msg = new Message(new JGAddress(messenger.getMemberID())); + Object result = messenger.readJGMessage(msg); + assertNull(result); + + // for code coverage we need to pump this message through the receiver + receiver.receive(msg); + + // for more code coverage we need to actually set a buffer in the message + msg.setBuffer(new byte[0]); + result = messenger.readJGMessage(msg); + assertNull(result); + receiver.receive(msg); + + // now create a view and a real distribution-message + InternalDistributedMember myAddress = messenger.getMemberID(); + InternalDistributedMember other = createAddress(8888); + NetView v = new NetView(myAddress); + v.add(other); + when(joinLeave.getView()).thenReturn(v); + messenger.installView(v); + + List<InternalDistributedMember> recipients = v.getMembers(); + SerialAckedMessage dmsg = new SerialAckedMessage(); + dmsg.setRecipients(recipients); + + // a message is ignored during manager shutdown + msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL); + when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE); + receiver.receive(msg); + verify(manager, never()).processMessage(isA(DistributionMessage.class)); + } + + @Test + public void testUseOldJChannel() throws Exception { + initMocks(false); + JChannel channel = messenger.myChannel; + services.getConfig().getTransport().setOldDSMembershipInfo(channel); + JGroupsMessenger newMessenger = new JGroupsMessenger(); + newMessenger.init(services); + newMessenger.start(); + newMessenger.started(); + newMessenger.stop(); + assertTrue(newMessenger.myChannel == messenger.myChannel); + } + /** * creates an InternalDistributedMember address that can be used * with the doctored JGroups channel. This includes a logical @@ -439,7 +764,7 @@ public class JGroupsMessengerJUnitTest { * @param port the UDP port to use for the new address */ private InternalDistributedMember createAddress(int port) { - GMSMember gms = new GMSMember("localhost", 8888); + GMSMember gms = new GMSMember("localhost", port); gms.setUUID(UUID.randomUUID()); gms.setVmKind(DistributionManager.NORMAL_DM_TYPE); gms.setVersionOrdinal(Version.CURRENT_ORDINAL);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java b/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java index 0004246..5dffa47 100644 --- a/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java +++ b/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java @@ -31,4 +31,6 @@ public interface RemoteDUnitVMIF extends Remote { MethExecutorResult executeMethodOnClass(String name, String methodName, Object[] args) throws RemoteException; + void shutDownVM() throws RemoteException; + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/ChildVM.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/standalone/ChildVM.java b/gemfire-core/src/test/java/dunit/standalone/ChildVM.java index 67b2710..45a236a 100644 --- a/gemfire-core/src/test/java/dunit/standalone/ChildVM.java +++ b/gemfire-core/src/test/java/dunit/standalone/ChildVM.java @@ -34,6 +34,15 @@ import dunit.standalone.DUnitLauncher.MasterRemote; */ public class ChildVM { + private static boolean stopMainLoop = false; + + /** + * tells the main() loop to exit + */ + public static void stopVM() { + stopMainLoop = true; + } + static { createHydraLogWriter(); } @@ -54,7 +63,7 @@ public class ChildVM { Naming.rebind("//localhost:" + namingPort + "/vm" + vmNum, dunitVM); holder.signalVMReady(); //This loop is here so this VM will die even if the master is mean killed. - while(true) { + while (!stopMainLoop) { holder.ping(); Thread.sleep(1000); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java b/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java index f3109f3..72c33d6 100644 --- a/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java +++ b/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java @@ -169,6 +169,30 @@ public class DUnitLauncher { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { +// System.out.println("shutting down DUnit JVMs"); +// for (int i=0; i<NUM_VMS; i++) { +// try { +// processManager.getStub(i).shutDownVM(); +// } catch (Exception e) { +// System.out.println("exception shutting down vm_"+i+": " + e); +// } +// } +// // TODO - hasLiveVMs always returns true +// System.out.print("waiting for JVMs to exit"); +// long giveUp = System.currentTimeMillis() + 5000; +// while (giveUp > System.currentTimeMillis()) { +// if (!processManager.hasLiveVMs()) { +// return; +// } +// System.out.print("."); +// System.out.flush(); +// try { +// Thread.sleep(1000); +// } catch (InterruptedException e) { +// break; +// } +// } +// System.out.println("\nkilling any remaining JVMs"); processManager.killVMs(); } }); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java index 60ac04d..7fc762f 100644 --- a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java +++ b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java @@ -98,12 +98,20 @@ public class ProcessManager { public synchronized void killVMs() { for(ProcessHolder process : processes.values()) { if(process != null) { - //TODO - stop it gracefully? Why bother process.kill(); } } } + public synchronized boolean hasLiveVMs() { + for(ProcessHolder process : processes.values()) { + if(process != null && process.isAlive()) { + return true; + } + } + return false; + } + public synchronized void bounce(int vmNum) { if(!processes.containsKey(vmNum)) { throw new IllegalStateException("No such process " + vmNum); @@ -240,6 +248,10 @@ public class ProcessManager { public boolean isKilled() { return killed; } + + public boolean isAlive() { + return !killed && process.isAlive(); + } } public RemoteDUnitVMIF getStub(int i) throws AccessException, RemoteException, NotBoundException, InterruptedException { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java b/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java index 15acc2e..742dc55 100644 --- a/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java +++ b/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java @@ -135,11 +135,10 @@ public class RemoteDUnitVM extends UnicastRemoteObject implements RemoteDUnitVMI } - public void shutDownVM(boolean disconnect, boolean runShutdownHook) - throws RemoteException { + public void shutDownVM() throws RemoteException { + ChildVM.stopVM(); } - public void disconnectVM() - throws RemoteException { + public void disconnectVM() throws RemoteException { } }
