Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 3949744cd -> 1d9b19d9c
NO-JIRA Adding a test playing with network disconnects and failover (cherry picked from commit 05ce7c6ecd1c70fc571764af9027767f04538ccd) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1d9b19d9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1d9b19d9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1d9b19d9 Branch: refs/heads/2.6.x Commit: 1d9b19d9c27a8a471e5c50828699f3f1b63480a3 Parents: 3949744 Author: Clebert Suconic <[email protected]> Authored: Wed Sep 5 18:09:18 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Wed Sep 12 17:15:05 2018 -0400 ---------------------------------------------------------------------- .../artemis/tests/util/ActiveMQTestBase.java | 5 +- .../failover/NetworkFailureFailoverTest.java | 688 +++++++++++++++++++ .../artemis/tests/util/network/NetUtil.java | 226 ++++++ .../tests/util/network/NetUtilResource.java | 29 + 4 files changed, 946 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1d9b19d9/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 530c399..2cd5d56 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1219,7 +1219,7 @@ public abstract class ActiveMQTestBase extends Assert { return params; } - protected static final TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) { + protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) { if (live) { return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY); } @@ -1231,7 +1231,7 @@ public abstract class ActiveMQTestBase extends Assert { return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); } - protected static final TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) { + protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) { if (live) { return new TransportConfiguration(NETTY_CONNECTOR_FACTORY); } @@ -1239,6 +1239,7 @@ public abstract class ActiveMQTestBase extends Assert { Map<String, Object> server1Params = new HashMap<>(); server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); + server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1d9b19d9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java new file mode 100644 index 0000000..1011502 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java @@ -0,0 +1,688 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.client.impl.Topology; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.network.NetUtil; +import org.apache.activemq.artemis.tests.util.network.NetUtilResource; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class NetworkFailureFailoverTest extends FailoverTestBase { + + @Rule + public NetUtilResource netUtilResource = new NetUtilResource(); + + @BeforeClass + public static void start() { + NetUtil.assumeSudo(); + } + + // 192.0.2.0 is reserved for documentation, so I'm pretty sure this won't exist on any system. (It shouldn't at least) + private static final String LIVE_IP = "192.0.2.0"; + + private int beforeTime; + + @Override + public void setUp() throws Exception { + // beforeTime = NettyConnection.getLockTimeout(); + // NettyConnection.setLockTimeout(1000); + NetUtil.netUp(LIVE_IP); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + // NettyConnection.setLockTimeout(beforeTime); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return getNettyAcceptorTransportConfiguration(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return getNettyConnectorTransportConfiguration(live); + } + + protected ClientSession createSession(ClientSessionFactory sf1, + boolean autoCommitSends, + boolean autoCommitAcks, + int ackBatchSize) throws Exception { + return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks, ackBatchSize)); + } + + protected ClientSession createSession(ClientSessionFactory sf1, + boolean autoCommitSends, + boolean autoCommitAcks) throws Exception { + return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks)); + } + + protected ClientSession createSession(ClientSessionFactory sf1) throws Exception { + return addClientSession(sf1.createSession()); + } + + protected ClientSession createSession(ClientSessionFactory sf1, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks) throws Exception { + return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks)); + } + + @Override + protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) { + Map<String, Object> server1Params = new HashMap<>(); + + if (live) { + server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT); + server1Params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); + } else { + server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT); + server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + } + + return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); + } + + @Override + protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) { + Map<String, Object> server1Params = new HashMap<>(); + + if (live) { + server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT); + server1Params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); + } else { + server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT); + server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + } + + server1Params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); + + return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); + } + + @Test + public void testFailoverAfterNetFailure() throws Exception { + final AtomicInteger sentMessages = new AtomicInteger(0); + final AtomicInteger blockedAt = new AtomicInteger(0); + + Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); + Map<String, Object> params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); + params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + + final AtomicInteger countSent = new AtomicInteger(0); + + liveServer.addInterceptor(new Interceptor() { + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + //System.out.println("Received " + packet); + if (packet instanceof SessionSendMessage) { + + if (countSent.incrementAndGet() == 500) { + try { + NetUtil.netDown(LIVE_IP); + System.out.println("Blocking traffic"); + // Thread.sleep(3000); // this is important to let stuff to block + liveServer.crash(true, false); + } catch (Exception e) { + e.printStackTrace(); + } + new Thread() { + @Override + public void run() { + try { + System.err.println("Stopping server"); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + } + } + return true; + } + }); + + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)); + + locator.setBlockOnNonDurableSend(false); + locator.setBlockOnDurableSend(true); + locator.setBlockOnAcknowledge(false); + locator.setReconnectAttempts(-1); + locator.setConfirmationWindowSize(-1); + locator.setProducerWindowSize(-1); + locator.setClientFailureCheckPeriod(100); + locator.setConnectionTTL(1000); + ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2); + sfProducer.addFailureListener(new SessionFailureListener() { + @Override + public void beforeReconnect(ActiveMQException exception) { + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + + } + }); + + ClientSession sessionProducer = createSession(sfProducer, true, true, 0); + + sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true); + + ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS); + + final int numMessages = 2001; + final CountDownLatch latchReceived = new CountDownLatch(numMessages); + + ClientSessionFactoryInternal sfConsumer = createSessionFactoryAndWaitForTopology(locator, 2); + + final ClientSession sessionConsumer = createSession(sfConsumer, true, true, 0); + final ClientConsumer consumer = sessionConsumer.createConsumer(FailoverTestBase.ADDRESS); + + sessionConsumer.start(); + + final AtomicBoolean running = new AtomicBoolean(true); + + final Thread t = new Thread() { + @Override + public void run() { + int received = 0; + int errors = 0; + while (running.get() && received < numMessages) { + try { + ClientMessage msgReceived = consumer.receive(500); + if (msgReceived != null) { + latchReceived.countDown(); + msgReceived.acknowledge(); + if (received++ % 100 == 0) { + System.out.println("Received " + received); + sessionConsumer.commit(); + } + } else { + System.out.println("Null"); + } + } catch (Throwable e) { + errors++; + if (errors > 10) { + break; + } + e.printStackTrace(); + } + } + } + }; + + t.start(); + + for (sentMessages.set(0); sentMessages.get() < numMessages; sentMessages.incrementAndGet()) { + do { + try { + if (sentMessages.get() % 100 == 0) { + System.out.println("Sent " + sentMessages.get()); + } + producer.send(createMessage(sessionProducer, sentMessages.get(), true)); + break; + } catch (Exception e) { + sentMessages.decrementAndGet(); + new Exception("Exception on ending", e).printStackTrace(); + } + } + while (true); + } + + // these may never be received. doing the count down where we blocked. + for (int i = 0; i < blockedAt.get(); i++) { + latchReceived.countDown(); + } + + Assert.assertTrue(latchReceived.await(1, TimeUnit.MINUTES)); + + running.set(false); + + t.join(); + } + + + private int countTopologyMembers(Topology topology) { + int count = 0; + for (TopologyMember m : topology.getMembers()) { + count++; + if (m.getBackup() != null) { + count++; + } + } + + return count; + } + + @Test + public void testNetFailureConsume() throws Exception { + final AtomicInteger sentMessages = new AtomicInteger(0); + final AtomicInteger blockedAt = new AtomicInteger(0); + + Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); + Map<String, Object> params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); + params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + + final AtomicInteger countSent = new AtomicInteger(0); + + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)); + + locator.setBlockOnNonDurableSend(false); + locator.setBlockOnDurableSend(false); + locator.setBlockOnAcknowledge(false); + locator.setReconnectAttempts(-1); + locator.setConfirmationWindowSize(-1); + locator.setProducerWindowSize(-1); + locator.setClientFailureCheckPeriod(100); + locator.setConnectionTTL(1000); + ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2); + + Wait.assertEquals(2, () -> countTopologyMembers(locator.getTopology())); + + sfProducer.addFailureListener(new SessionFailureListener() { + @Override + public void beforeReconnect(ActiveMQException exception) { + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + + } + }); + + ClientSession sessionProducer = createSession(sfProducer, true, true, 0); + + sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true); + + ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS); + + final int numMessages = 2001; + final CountDownLatch latchReceived = new CountDownLatch(numMessages); + + ClientSessionFactoryInternal sfConsumer = createSessionFactoryAndWaitForTopology(locator, 2); + + final ClientSession sessionConsumer = createSession(sfConsumer, true, true, 0); + final ClientConsumer consumer = sessionConsumer.createConsumer(FailoverTestBase.ADDRESS); + + sessionConsumer.start(); + + final AtomicBoolean running = new AtomicBoolean(true); + + final Thread t = new Thread() { + @Override + public void run() { + int received = 0; + int errors = 0; + while (running.get() && received < numMessages) { + try { + ClientMessage msgReceived = consumer.receive(500); + if (msgReceived != null) { + latchReceived.countDown(); + msgReceived.acknowledge(); + if (++received % 100 == 0) { + + if (received == 300) { + System.out.println("Shutting down IP"); + NetUtil.netDown(LIVE_IP); + liveServer.crash(true, false); + } + System.out.println("Received " + received); + sessionConsumer.commit(); + } + } else { + System.out.println("Null"); + } + } catch (Throwable e) { + errors++; + if (errors > 10) { + break; + } + e.printStackTrace(); + } + } + } + }; + + + for (sentMessages.set(0); sentMessages.get() < numMessages; sentMessages.incrementAndGet()) { + do { + try { + if (sentMessages.get() % 100 == 0) { + System.out.println("Sent " + sentMessages.get()); + } + producer.send(createMessage(sessionProducer, sentMessages.get(), true)); + break; + } catch (Exception e) { + sentMessages.decrementAndGet(); + new Exception("Exception on ending", e).printStackTrace(); + } + } + while (true); + } + + sessionProducer.close(); + + + t.start(); + + // these may never be received. doing the count down where we blocked. + for (int i = 0; i < blockedAt.get(); i++) { + latchReceived.countDown(); + } + + Assert.assertTrue(latchReceived.await(1, TimeUnit.MINUTES)); + + running.set(false); + + t.join(); + } + + @Test + public void testFailoverCreateSessionOnFailure() throws Exception { + final AtomicInteger sentMessages = new AtomicInteger(0); + final AtomicInteger blockedAt = new AtomicInteger(0); + + Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); + Map<String, Object> params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); + params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + + final AtomicInteger countSent = new AtomicInteger(0); + + final CountDownLatch latchDown = new CountDownLatch(1); + + liveServer.addInterceptor(new Interceptor() { + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + //System.out.println("Received " + packet); + if (packet instanceof CreateSessionMessage) { + + if (countSent.incrementAndGet() == 50) { + try { + NetUtil.netDown(LIVE_IP); + System.out.println("Blocking traffic"); + Thread.sleep(3000); // this is important to let stuff to block + blockedAt.set(sentMessages.get()); + latchDown.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + return true; + } + }); + + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)); + //locator.setDebugReconnects("CF_retry"); + + locator.setBlockOnNonDurableSend(false); + locator.setBlockOnDurableSend(false); + locator.setBlockOnAcknowledge(false); + locator.setReconnectAttempts(-1); + locator.setConfirmationWindowSize(-1); + locator.setProducerWindowSize(-1); + locator.setClientFailureCheckPeriod(100); + locator.setConnectionTTL(1000); + final ClientSessionFactoryInternal sessionFactory = createSessionFactoryAndWaitForTopology(locator, 2); + final AtomicInteger failed = new AtomicInteger(0); + sessionFactory.addFailureListener(new SessionFailureListener() { + @Override + public void beforeReconnect(ActiveMQException exception) { + if (failed.incrementAndGet() == 1) { + Thread.currentThread().interrupt(); + } + new Exception("producer before reconnect", exception).printStackTrace(); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + + } + }); + final int numSessions = 100; + final CountDownLatch latchCreated = new CountDownLatch(numSessions); + + final AtomicBoolean running = new AtomicBoolean(true); + + final Thread t = new Thread("session-creator") { + @Override + public void run() { + int received = 0; + int errors = 0; + while (running.get() && received < numSessions) { + try { + ClientSession session = sessionFactory.createSession(); + System.out.println("Creating session, currentLatch = " + latchCreated.getCount()); + session.close(); + latchCreated.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + errors++; + } + } + } + }; + + t.start(); + + Assert.assertTrue(latchDown.await(1, TimeUnit.MINUTES)); + + Thread.sleep(1000); + + System.out.println("Server crashed now!!!"); + + liveServer.crash(true, false); + + try { + Assert.assertTrue(latchCreated.await(5, TimeUnit.MINUTES)); + + } finally { + running.set(false); + + t.join(TimeUnit.SECONDS.toMillis(30)); + } + } + + @Test + public void testInterruptFailingThread() throws Exception { + final AtomicInteger sentMessages = new AtomicInteger(0); + final AtomicInteger blockedAt = new AtomicInteger(0); + + Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); + Map<String, Object> params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); + params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + + final AtomicInteger countSent = new AtomicInteger(0); + + final CountDownLatch latchBlocked = new CountDownLatch(1); + + liveServer.addInterceptor(new Interceptor() { + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + //System.out.println("Received " + packet); + if (packet instanceof SessionSendMessage) { + + if (countSent.incrementAndGet() == 50) { + try { + NetUtil.netDown(LIVE_IP); + System.out.println("Blocking traffic"); + Thread.sleep(3000); // this is important to let stuff to block + blockedAt.set(sentMessages.get()); + latchBlocked.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + // new Thread() + // { + // public void run() + // { + // try + // { + // System.err.println("Stopping server"); + // // liveServer.stop(); + // liveServer.crash(true, false); + // } + // catch (Exception e) + // { + // e.printStackTrace(); + // } + // } + // }.start(); + } + } + return true; + } + }); + + final CountDownLatch failing = new CountDownLatch(1); + final HashSet<Thread> setThread = new HashSet<>(); + + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)); + + locator.setBlockOnNonDurableSend(false); + locator.setBlockOnDurableSend(false); + locator.setBlockOnAcknowledge(false); + locator.setReconnectAttempts(-1); + locator.setConfirmationWindowSize(-1); + locator.setProducerWindowSize(-1); + locator.setClientFailureCheckPeriod(100); + locator.setConnectionTTL(1000); + ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2); + sfProducer.addFailureListener(new SessionFailureListener() { + @Override + public void beforeReconnect(ActiveMQException exception) { + setThread.add(Thread.currentThread()); + failing.countDown(); + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + + } + }); + + final ClientSession sessionProducer = createSession(sfProducer, true, true, 0); + + sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true); + + final ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS); + + final int numMessages = 10000; + + final AtomicBoolean running = new AtomicBoolean(true); + final CountDownLatch messagesSentlatch = new CountDownLatch(numMessages); + + Thread t = new Thread("sendingThread") { + @Override + public void run() { + + while (sentMessages.get() < numMessages && running.get()) { + try { + if (sentMessages.get() % 10 == 0) { + System.out.println("Sent " + sentMessages.get()); + } + producer.send(createMessage(sessionProducer, sentMessages.get(), true)); + sentMessages.incrementAndGet(); + messagesSentlatch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + } + }; + + t.start(); + + Assert.assertTrue(latchBlocked.await(1, TimeUnit.MINUTES)); + + Assert.assertTrue(failing.await(1, TimeUnit.MINUTES)); + + for (int i = 0; i < 5; i++) { + for (Thread tint : setThread) { + tint.interrupt(); + } + Thread.sleep(500); + } + + liveServer.crash(true, false); + + Assert.assertTrue(messagesSentlatch.await(3, TimeUnit.MINUTES)); + + running.set(false); + + t.join(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1d9b19d9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java new file mode 100644 index 0000000..1d29b38 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.util.network; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.server.NetworkHealthCheck; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +public class NetUtil { + + public static boolean checkIP(String ip) throws Exception { + InetAddress ipAddress = null; + try { + ipAddress = InetAddress.getByName(ip); + } catch (Exception e) { + e.printStackTrace(); // not supposed to happen + return false; + } + NetworkHealthCheck healthCheck = new NetworkHealthCheck(null, 100, 100); + return healthCheck.check(ipAddress); + } + + public static AtomicInteger nextDevice = new AtomicInteger(0); + + // IP / device (device being only valid on linux) + public static Map<String, String> networks = new ConcurrentHashMap<>(); + + private enum OS { + MAC, LINUX, NON_SUPORTED; + } + + static final OS osUsed; + static final String user = System.getProperty("user.name"); + + static { + OS osTmp; + + String propOS = System.getProperty("os.name").toUpperCase(); + + if (propOS.contains("MAC")) { + osTmp = OS.MAC; + } else if (propOS.contains("LINUX")) { + osTmp = OS.LINUX; + } else { + osTmp = OS.NON_SUPORTED; + } + + osUsed = osTmp; + } + + public static void assumeSudo() { + Assume.assumeTrue("non supported OS", osUsed != OS.NON_SUPORTED); + if (!canSudo()) { + System.out.println("Add the following at the end of your /etc/sudoers (use the visudo command)"); + System.out.println("# ------------------------------------------------------- "); + System.out.println(user + " ALL = NOPASSWD: /sbin/ifconfig"); + System.out.println("# ------------------------------------------------------- "); + Assume.assumeFalse(true); + } + } + + public static void cleanup() { + nextDevice.set(0); + + Set entrySet = networks.entrySet(); + Iterator iter = entrySet.iterator(); + while (iter.hasNext()) { + Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next(); + try { + netDown(entry.getKey(), entry.getValue()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public static void netUp(String ip) throws Exception { + String deviceID = "lo:" + nextDevice.incrementAndGet(); + if (osUsed == OS.MAC) { + if (runCommand("sudo", "-n", "ifconfig", "lo0", "alias", ip) != 0) { + Assert.fail("Cannot sudo ifconfig for ip " + ip); + } + networks.put(ip, "lo0"); + } else if (osUsed == OS.LINUX) { + if (runCommand("sudo", "-n", "ifconfig", deviceID, ip, "netmask", "255.0.0.0") != 0) { + Assert.fail("Cannot sudo ifconfig for ip " + ip); + } + networks.put(ip, deviceID); + } else { + Assert.fail("OS not supported"); + } + } + + public static void netDown(String ip) throws Exception { + String device = networks.remove(ip); + Assert.assertNotNull("ip " + ip + "wasn't set up before", device); + netDown(ip, device); + + } + + private static void netDown(String ip, String device) throws Exception { + if (osUsed == OS.MAC) { + if (runCommand("sudo", "-n", "ifconfig", "lo0", "-alias", ip) != 0) { + Assert.fail("Cannot sudo ifconfig for ip " + ip); + } + } else if (osUsed == OS.LINUX) { + if (runCommand("sudo", "-n", "ifconfig", device, "down") != 0) { + Assert.fail("Cannot sudo ifconfig for ip " + ip); + } + } else { + Assert.fail("OS not supported"); + } + } + + private static final Logger logger = Logger.getLogger(NetUtil.class); + + public static int runCommand(String... command) throws Exception { + return runCommand(10, TimeUnit.SECONDS, command); + } + + public static int runCommand(long timeout, TimeUnit timeoutUnit, String... command) throws Exception { + + logCommand(command); + + // it did not work with a simple isReachable, it could be because there's no root access, so we will try ping executable + ProcessBuilder processBuilder = new ProcessBuilder(command); + final Process process = processBuilder.start(); + + Thread t = new Thread() { + @Override + public void run() { + try { + readStream(process.getInputStream(), true); + } catch (Exception dontCare) { + + } + } + }; + Thread t2 = new Thread() { + @Override + public void run() { + try { + readStream(process.getErrorStream(), true); + } catch (Exception dontCare) { + + } + } + }; + t2.start(); + + int value = process.waitFor(); + + t.join(timeoutUnit.toMillis(timeout)); + Assert.assertFalse(t.isAlive()); + t2.join(timeoutUnit.toMillis(timeout)); + + return value; + } + + private static void logCommand(String[] command) { + StringBuffer logCommand = new StringBuffer(); + for (String c : command) { + logCommand.append(c + " "); + } + System.out.println("NetUTIL command::" + logCommand.toString()); + } + + public static boolean canSudo() { + try { + return runCommand("sudo", "-n", "ifconfig") == 0; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + + @Test + public void testCanSudo() throws Exception { + Assert.assertTrue(canSudo()); + } + + private static void readStream(InputStream stream, boolean error) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); + + String inputLine; + while ((inputLine = reader.readLine()) != null) { + if (error) { + logger.warn(inputLine); + } else { + logger.trace(inputLine); + } + } + + reader.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1d9b19d9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java new file mode 100644 index 0000000..0f2abd9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.util.network; + +import org.junit.rules.ExternalResource; + +public class NetUtilResource extends ExternalResource { + + @Override + protected void after() { + super.after(); + NetUtil.cleanup(); + } +}
