Repository: incubator-tephra Updated Branches: refs/heads/master 8e5ef26ad -> ae574caf7
TEPHRA-179 Create a new instance of TransactionManager and related classes when TransactionService becomes leader. This closes #2 on GitHub. Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/ae574caf Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/ae574caf Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/ae574caf Branch: refs/heads/master Commit: ae574caf7e1adf62e495ba1a7a28ad8834eb98ea Parents: 8e5ef26 Author: Ali Anwar <[email protected]> Authored: Fri Sep 2 17:08:18 2016 -0700 Committer: poorna <[email protected]> Committed: Wed Sep 7 16:37:03 2016 -0700 ---------------------------------------------------------------------- .../inmemory/InMemoryTransactionService.java | 6 +- .../runtime/TransactionDistributedModule.java | 15 +-- .../runtime/TransactionInMemoryModule.java | 11 +- .../tephra/runtime/TransactionLocalModule.java | 8 +- .../ThriftTransactionServerTest.java | 100 ++++++++++++++----- .../tephra/examples/BalanceBooksTest.java | 1 + 6 files changed, 101 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java index 823f934..fb45362 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java @@ -43,6 +43,7 @@ public class InMemoryTransactionService extends AbstractService { private final DiscoveryService discoveryService; private final String serviceName; + // this is Provider, so that we can have multiple instances of it (use a new instance after leader election) protected final Provider<TransactionManager> txManagerProvider; private Cancellable cancelDiscovery; protected TransactionManager txManager; @@ -55,9 +56,8 @@ public class InMemoryTransactionService extends AbstractService { protected final int maxReadBufferBytes; @Inject - public InMemoryTransactionService(Configuration conf, - DiscoveryService discoveryService, - Provider<TransactionManager> txManagerProvider) { + public InMemoryTransactionService(Configuration conf, DiscoveryService discoveryService, + Provider<TransactionManager> txManagerProvider) { this.discoveryService = discoveryService; this.txManagerProvider = txManagerProvider; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java index aaf3534..ff796c1 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java @@ -19,6 +19,7 @@ package org.apache.tephra.runtime; import com.google.inject.AbstractModule; +import com.google.inject.Scopes; import com.google.inject.Singleton; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.name.Names; @@ -41,14 +42,16 @@ final class TransactionDistributedModule extends AbstractModule { @Override protected void configure() { + // some of these classes need to be non-singleton in order to create a new instance during leader() in + // TransactionService bind(SnapshotCodecProvider.class).in(Singleton.class); - bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) - .to(HDFSTransactionStateStorage.class).in(Singleton.class); - bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class); + bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")).to(HDFSTransactionStateStorage.class); + bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class); - bind(TransactionManager.class).in(Singleton.class); - bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class); - bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class); + // to catch issues during configure time + bind(TransactionManager.class); + bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Scopes.SINGLETON); + bind(MetricsCollector.class).to(DefaultMetricsCollector.class); install(new FactoryModuleBuilder() .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java index de7678a..1b9032c 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java @@ -19,6 +19,7 @@ package org.apache.tephra.runtime; import com.google.inject.AbstractModule; +import com.google.inject.Scopes; import com.google.inject.Singleton; import com.google.inject.assistedinject.FactoryModuleBuilder; import org.apache.tephra.DefaultTransactionExecutor; @@ -43,10 +44,12 @@ public class TransactionInMemoryModule extends AbstractModule { @Override protected void configure() { - bind(SnapshotCodecProvider.class).in(Singleton.class); - bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class); - bind(TransactionManager.class).in(Singleton.class); - bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); + // some of these classes need to be non-singleton in order to create a new instance during leader() in + // TransactionService + bind(SnapshotCodecProvider.class).in(Scopes.SINGLETON); + bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class); + bind(TransactionManager.class); + bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Scopes.SINGLETON); // no metrics output for in-memory bind(MetricsCollector.class).to(TxMetricsCollector.class); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java index 7d0b663..4a79e8d 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java @@ -41,12 +41,14 @@ final class TransactionLocalModule extends AbstractModule { @Override protected void configure() { + // some of these classes need to be non-singleton in order to create a new instance during leader() in + // TransactionService bind(SnapshotCodecProvider.class).in(Singleton.class); bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) - .to(LocalFileTransactionStateStorage.class).in(Singleton.class); - bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class); + .to(LocalFileTransactionStateStorage.class); + bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class); - bind(TransactionManager.class).in(Singleton.class); + bind(TransactionManager.class); bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); bind(MetricsCollector.class).to(DefaultMetricsCollector.class); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java index a930720..bbe03ed 100644 --- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java @@ -27,6 +27,7 @@ import com.google.inject.Scopes; import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.ThriftTransactionSystemTest; +import org.apache.tephra.Transaction; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; import org.apache.tephra.persist.InMemoryTransactionStateStorage; @@ -43,10 +44,9 @@ import org.apache.twill.zookeeper.ZKClientService; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,6 +63,7 @@ import java.util.concurrent.TimeUnit; /** * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132 + * as well as proper handling of zk election https://issues.cask.co/browse/TEPHRA-179. */ public class ThriftTransactionServerTest { private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); @@ -70,17 +72,18 @@ public class ThriftTransactionServerTest { private static ZKClientService zkClientService; private static TransactionService txService; private static TransactionStateStorage storage; - static Injector injector; + private static Injector injector; private static final int NUM_CLIENTS = 17; - private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1); - private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS); + // storageWaitLatch is used to simulate slow HDFS writes for TEPHRA-132 + private static CountDownLatch storageWaitLatch; + private static CountDownLatch clientsDoneLatch; @ClassRule public static TemporaryFolder tmpFolder = new TemporaryFolder(); - @BeforeClass - public static void start() throws Exception { + @Before + public void start() throws Exception { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); zkServer.startAndWait(); @@ -103,6 +106,8 @@ public class ThriftTransactionServerTest { @Override protected void configure() { bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON); + // overriding this to make it non-singleton + bind(TransactionSystemClient.class).to(TransactionServiceClient.class); } }), new TransactionClientModule() @@ -120,15 +125,15 @@ public class ThriftTransactionServerTest { } catch (Exception e) { LOG.error("Failed to start service: ", e); } - } - @Before - public void reset() throws Exception { getClient().resetState(); + + storageWaitLatch = new CountDownLatch(1); + clientsDoneLatch = new CountDownLatch(NUM_CLIENTS); } - @AfterClass - public static void stop() throws Exception { + @After + public void stop() throws Exception { txService.stopAndWait(); storage.stopAndWait(); zkClientService.stopAndWait(); @@ -141,6 +146,8 @@ public class ThriftTransactionServerTest { @Test public void testThriftServerStop() throws Exception { + Assert.assertEquals(Service.State.RUNNING, txService.thriftRPCServerState()); + int nThreads = NUM_CLIENTS; ExecutorService executorService = Executors.newFixedThreadPool(nThreads); for (int i = 0; i < nThreads; ++i) { @@ -149,7 +156,8 @@ public class ThriftTransactionServerTest { public void run() { try { TransactionSystemClient txClient = getClient(); - CLIENTS_DONE_LATCH.countDown(); + clientsDoneLatch.countDown(); + // this will hang, due to the slow edit log (until the latch in it is stopped) txClient.startShort(); } catch (Exception e) { // Exception expected @@ -158,23 +166,53 @@ public class ThriftTransactionServerTest { }); } - // Wait till all clients finish sending reqeust to transaction manager - CLIENTS_DONE_LATCH.await(); + // Wait till all clients finish sending request to transaction manager + clientsDoneLatch.await(); TimeUnit.SECONDS.sleep(1); // Expire zookeeper session, which causes Thrift server to stop. expireZkSession(zkClientService); - waitForThriftTermination(); + waitForThriftStop(); - // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again. + // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift server again. zkClientService.stopAndWait(); - STORAGE_WAIT_LATCH.countDown(); + storageWaitLatch.countDown(); TimeUnit.SECONDS.sleep(1); // Make sure Thrift server stopped. Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState()); } + @Test + public void testThriftServerRestart() throws Exception { + // we don't need a slow Transaction Log for this test case + storageWaitLatch.countDown(); + Assert.assertEquals(Service.State.RUNNING, txService.thriftRPCServerState()); + + // simply start + commit transaction + TransactionSystemClient txClient = getClient(); + Transaction tx = txClient.startShort(); + txClient.commit(tx); + + // Expire zookeeper session, which causes Thrift server to stop running. + expireZkSession(zkClientService); + waitForThriftStop(); + + // wait for the thrift rpc server to be in running state again + waitFor("Failed to wait for txService to be running.", new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return Service.State.RUNNING == txService.thriftRPCServerState(); + } + }); + + // we need to get a new txClient, because the old one will no longer work after the thrift server restart + txClient = getClient(); + // verify that we can start and commit a transaction after becoming leader again + tx = txClient.startShort(); + txClient.commit(tx); + } + private void expireZkSession(ZKClientService zkClientService) throws Exception { ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get(); final SettableFuture<?> connectFuture = SettableFuture.create(); @@ -188,7 +226,7 @@ public class ThriftTransactionServerTest { }; // Create another Zookeeper session with the same sessionId so that the original one expires. - final ZooKeeper dupZookeeper = + ZooKeeper dupZookeeper = new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher, zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); connectFuture.get(30, TimeUnit.SECONDS); @@ -196,13 +234,27 @@ public class ThriftTransactionServerTest { dupZookeeper.close(); } - private void waitForThriftTermination() throws InterruptedException { - int count = 0; - while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) { + private void waitFor(String errorMessage, Callable<Boolean> callable) throws Exception { + for (int i = 0; i < 200; i++) { + boolean value = callable.call(); + if (value) { + return; + } TimeUnit.MILLISECONDS.sleep(50); } + Assert.fail(errorMessage); + } + + private void waitForThriftStop() throws Exception { + waitFor("Failed to wait for txService to stop", new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return Service.State.RUNNING != txService.thriftRPCServerState(); + } + }); } + // the edit log will block until a countdown latch is decremented to simulate heavy load for TEPHRA-132 private static class SlowTransactionStorage extends InMemoryTransactionStateStorage { @Override public TransactionLog createLog(long timestamp) throws IOException { @@ -218,7 +270,7 @@ public class ThriftTransactionServerTest { @Override public void append(TransactionEdit edit) throws IOException { try { - STORAGE_WAIT_LATCH.await(); + storageWaitLatch.await(); } catch (InterruptedException e) { LOG.error("Got exception: ", e); } @@ -228,7 +280,7 @@ public class ThriftTransactionServerTest { @Override public void append(List<TransactionEdit> edits) throws IOException { try { - STORAGE_WAIT_LATCH.await(); + storageWaitLatch.await(); } catch (InterruptedException e) { LOG.error("Got exception: ", e); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java index 3be1c43..1abeece 100644 --- a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java +++ b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java @@ -107,6 +107,7 @@ public class BalanceBooksTest { txService.startAndWait(); } catch (Exception e) { LOG.error("Failed to start service: ", e); + throw e; } }
