Fix for: https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted in AMQ cluster.
- Log rotation was causing a pre-mature index snapshot to be taken on the slave (snapshot while the slave was still synchronizing). - Also fix issue with the append position displayed in JMX for the master not being correct. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/50f37beb Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/50f37beb Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/50f37beb Branch: refs/heads/activemq-5.9 Commit: 50f37beb87facd24c07e66b4ed82a30a9d410278 Parents: 926a435 Author: Hiram Chirino <[email protected]> Authored: Thu Oct 31 12:18:48 2013 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Tue Mar 11 21:18:49 2014 -0400 ---------------------------------------------------------------------- activemq-leveldb-store/pom.xml | 6 + .../apache/activemq/leveldb/LevelDBClient.scala | 19 ++- .../leveldb/replicated/SlaveLevelDBStore.scala | 16 +- .../test/ReplicatedLevelDBBrokerTest.java | 160 ++++++++++++++----- 4 files changed, 156 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/pom.xml b/activemq-leveldb-store/pom.xml index 9535b19..a2dd64b 100644 --- a/activemq-leveldb-store/pom.xml +++ b/activemq-leveldb-store/pom.xml @@ -147,6 +147,12 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + <!-- For Optional Snappy Compression --> <dependency> <groupId>org.xerial.snappy</groupId> http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 1c6adec..dbf6512 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -625,11 +625,15 @@ class LevelDBClient(store: LevelDBStore) { log = createLog log.logSize = store.logSize log.on_log_rotate = ()=> { + post_log_rotate + } + } + + def post_log_rotate ={ // We snapshot the index every time we rotate the logs. writeExecutor { snapshotIndex(false) } - } } def replay_init() = { @@ -927,7 +931,16 @@ class LevelDBClient(store: LevelDBStore) { } } - var wal_append_position = 0L + + var stored_wal_append_position = 0L + + def wal_append_position = this.synchronized { + if (log!=null && log.isOpen) { + log.appender_limit + } else { + stored_wal_append_position + } + } def stop() = this.synchronized { if( writeExecutor!=null ) { @@ -948,7 +961,7 @@ class LevelDBClient(store: LevelDBStore) { if (log!=null && log.isOpen) { log.close copyDirtyIndexToSnapshot - wal_append_position = log.appender_limit + stored_wal_append_position = log.appender_limit log = null } if( plist!=null ) { http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index 4239a0b..f1a47f7 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -16,7 +16,7 @@ */ package org.apache.activemq.leveldb.replicated -import org.apache.activemq.leveldb.LevelDBStore +import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore} import org.apache.activemq.util.ServiceStopper import java.util import org.fusesource.hawtdispatch._ @@ -53,6 +53,16 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { var status = "initialized" + override def createClient = new LevelDBClient(this) { + // We don't want to start doing index snapshots until + // he slave is caught up. + override def post_log_rotate: Unit = { + if( caughtUp ) { + super.post_log_rotate + } + } + } + override def doStart() = { client.init() if (purgeOnStatup) { @@ -100,7 +110,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { // the stashed data might be the best option to become the master. stash(directory) delete_store(directory) - debug("Log replicaiton session connected") + debug("Log replication session connected") session.request_then(SYNC_ACTION, null) { body => val response = JsonCodec.decode(body, classOf[SyncResponse]) transfer_missing(response) @@ -165,7 +175,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { command.action match { case WAL_ACTION => val value = JsonCodec.decode(command.body, classOf[LogWrite]) - if( caughtUp && value.offset ==0 ) { + if( caughtUp && value.offset ==0 && value.file!=0 ) { client.log.rotate } val file = client.log.next_log(value.file) http://git-wip-us.apache.org/repos/asf/activemq/blob/50f37beb/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java index d9e44b8..a8e743f 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java @@ -21,17 +21,22 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore; import org.junit.After; +import org.junit.Before; import org.junit.Test; import javax.jms.*; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Enumeration; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; /** * Holds broker unit tests of the replicated leveldb store. @@ -46,50 +51,109 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { * https://issues.apache.org/jira/browse/AMQ-4837 */ @Test(timeout = 1000*60*10) - public void testAMQ4837() throws Exception { + public void testAMQ4837viaJMS() throws Throwable { + testAMQ4837(false); + } - // 1. Start 3 activemq nodes. - startBrokerAsync(createBrokerNode("node-1")); - startBrokerAsync(createBrokerNode("node-2")); - startBrokerAsync(createBrokerNode("node-3")); + /** + * Tries to replicate the problem reported at: + * https://issues.apache.org/jira/browse/AMQ-4837 + */ + @Test(timeout = 1000*60*10) + public void testAMQ4837viaJMX() throws Throwable { + for (int i = 0; i < 2; i++) { + resetDataDirs(); + testAMQ4837(true); + stopBrokers(); + } + } - // 2. Push a message to the master and browse the queue - System.out.println("Wait for master to start up..."); - BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS); - assertNotNull("Master elected", master); - sendMessage(master, "Hello World #1"); - assertEquals(1, browseMessages(master).size()); + @Before + public void resetDataDirs() throws IOException { + deleteDirectory("node-1"); + deleteDirectory("node-2"); + deleteDirectory("node-3"); + } - // 3. Stop master node - System.out.println("Stopping master..."); - master.stop(); - master.waitUntilStopped(); - BrokerService prevMaster = master; + protected void deleteDirectory(String s) throws IOException { + try { + FileUtils.deleteDirectory(new File(data_dir(), s)); + } catch (IOException e) { + } + } - // 4. Push a message to the new master (Node2) and browse the queue using the web UI. Message summary and queue content ok. - System.out.println("Wait for new master to start up..."); - master = masterQueue.poll(60, TimeUnit.SECONDS); - assertNotNull("Master elected", master); - sendMessage(master, "Hello World #2"); - assertEquals(2, browseMessages(master).size()); - // 5. Start Node1 - System.out.println("Starting previous master..."); - prevMaster = createBrokerNode(prevMaster.getBrokerName()); - startBrokerAsync(prevMaster); + public void testAMQ4837(boolean jmx) throws Throwable { + + try { + System.out.println("======================================"); + System.out.println("1. Start 3 activemq nodes."); + System.out.println("======================================"); + startBrokerAsync(createBrokerNode("node-1")); + startBrokerAsync(createBrokerNode("node-2")); + startBrokerAsync(createBrokerNode("node-3")); + + BrokerService master = waitForNextMaster(); + System.out.println("======================================"); + System.out.println("2. Push a message to the master and browse the queue"); + System.out.println("======================================"); + sendMessage(master, pad("Hello World #1", 1024)); + assertEquals(1, browseMessages(master, jmx).size()); - // 6. Stop master node (Node2) - System.out.println("Stopping master..."); + System.out.println("======================================"); + System.out.println("3. Stop master node"); + System.out.println("======================================"); + stop(master); + BrokerService prevMaster = master; + master = waitForNextMaster(); + + System.out.println("======================================"); + System.out.println("4. Push a message to the new master and browse the queue. Message summary and queue content ok."); + System.out.println("======================================"); + assertEquals(1, browseMessages(master, jmx).size()); + sendMessage(master, pad("Hello World #2", 1024)); + assertEquals(2, browseMessages(master, jmx).size()); + + System.out.println("======================================"); + System.out.println("5. Restart the stopped node & 6. stop current master"); + System.out.println("======================================"); + prevMaster = createBrokerNode(prevMaster.getBrokerName()); + startBrokerAsync(prevMaster); + stop(master); + + master = waitForNextMaster(); + System.out.println("======================================"); + System.out.println("7. Browse the queue on new master"); + System.out.println("======================================"); + assertEquals(2, browseMessages(master, jmx).size()); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + + } + + private void stop(BrokerService master) throws Exception { + System.out.println("Stopping "+master.getBrokerName()); master.stop(); master.waitUntilStopped(); + } - // 7. Browse the queue using the web UI on new master (Node3). Message summary ok however when clicking on the queue, no message details. - // An error (see below) is logged by the master, which attempts a restart. - System.out.println("Wait for new master to start up..."); - master = masterQueue.poll(60, TimeUnit.SECONDS); + private BrokerService waitForNextMaster() throws InterruptedException { + System.out.println("Wait for master to start up..."); + BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS); assertNotNull("Master elected", master); - assertEquals(2, browseMessages(master).size()); + assertFalse(master.isSlave()); + assertNull("Only one master elected at a time..", masterQueue.peek()); + System.out.println("Master started: " + master.getBrokerName()); + return master; + } + private String pad(String value, int size) { + while( value.length() < size ) { + value += " "; + } + return value; } private void startBrokerAsync(BrokerService b) { @@ -121,8 +185,25 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { connection.close(); } } + private ArrayList<String> browseMessages(BrokerService brokerService, boolean jmx) throws Exception { + if( jmx ) { + return browseMessagesViaJMX(brokerService); + } else { + return browseMessagesViaJMS(brokerService); + } + } + + private ArrayList<String> browseMessagesViaJMX(BrokerService brokerService) throws Exception { + ArrayList<String> rc = new ArrayList<String>(); + ObjectName on = new ObjectName("org.apache.activemq:type=Broker,brokerName="+brokerService.getBrokerName()+",destinationType=Queue,destinationName=FOO"); + CompositeData[] browse = (CompositeData[]) ManagementFactory.getPlatformMBeanServer().invoke(on, "browse", null, null); + for (CompositeData cd : browse) { + rc.add(cd.get("Text").toString()) ; + } + return rc; + } - private ArrayList<String> browseMessages(BrokerService brokerService) throws Exception { + private ArrayList<String> browseMessagesViaJMS(BrokerService brokerService) throws Exception { ArrayList<String> rc = new ArrayList<String>(); TransportConnector connector = brokerService.getTransportConnectors().get(0); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri()); @@ -143,14 +224,14 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { } @After - public void closeBrokers() throws Exception { + public void stopBrokers() throws Exception { for (BrokerService broker : brokers) { try { - broker.stop(); - broker.waitUntilStopped(); + stop(broker); } catch (Exception e) { } } + brokers.clear(); } private BrokerService createBrokerNode(String id) throws Exception { @@ -178,6 +259,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport { } }; store.setDirectory(new File(data_dir(), id)); + store.setContainer(id); store.setReplicas(3); store.setZkAddress("localhost:" + connector.getLocalPort()); store.setHostname("localhost");
