http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java new file mode 100644 index 0000000..28ccc6e --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Tests the transaction executor. + */ +public class TransactionExecutorTest { + private static DummyTxClient txClient; + private static TransactionExecutorFactory factory; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws IOException { + final Configuration conf = new Configuration(); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new DiscoveryModules().getInMemoryModules(), + Modules.override( + new TransactionModules().getInMemoryModules()).with(new AbstractModule() { + @Override + protected void configure() { + TransactionManager txManager = new TransactionManager(conf); + txManager.startAndWait(); + bind(TransactionManager.class).toInstance(txManager); + bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class); + } + })); + + txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class); + factory = injector.getInstance(TransactionExecutorFactory.class); + } + + final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); + final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2); + + private TransactionExecutor getExecutor() { + return factory.createExecutor(txAwares); + } + + private TransactionExecutor getExecutorWithNoRetry() { + return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries()); + } + + static final byte[] A = { 'a' }; + static final byte[] B = { 'b' }; + + final TransactionExecutor.Function<Integer, Integer> testFunction = + new TransactionExecutor.Function<Integer, Integer>() { + @Override + public Integer apply(@Nullable Integer input) { + ds1.addChange(A); + ds2.addChange(B); + if (input == null) { + throw new RuntimeException("function failed"); + } + return input * input; + } + }; + + @Before + public void resetTxAwares() { + ds1.reset(); + ds2.reset(); + } + + @Test + public void testSuccessful() throws TransactionFailureException, InterruptedException { + // execute: add a change to ds1 and ds2 + Integer result = getExecutor().execute(testFunction, 10); + // verify both are committed and post-committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertTrue(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertTrue(100 == result); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testPostCommitFailure() throws TransactionFailureException, InterruptedException { + ds1.failPostCommitTxOnce = InduceFailure.ThrowException; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("post commit failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("post failure", e.getCause().getMessage()); + } + // verify both are committed and post-committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertTrue(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testPersistFailure() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ThrowException; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("persist failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("persist failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testPersistFalse() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ReturnFalse; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("persist failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertNull(e.getCause()); // in this case, the ds simply returned false + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ThrowException; + ds1.failRollbackTxOnce = InduceFailure.ThrowException; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("persist failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("persist failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ReturnFalse; + ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("persist failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertNull(e.getCause()); // in this case, the ds simply returned false + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testNoIndefiniteRetryByDefault() throws TransactionFailureException, InterruptedException { + // we want retry by default, so that engineers don't miss it + txClient.failCommits = 1000; + try { + // execute: add a change to ds1 and ds2 + getExecutor().execute(testFunction, 10); + Assert.fail("commit failed too many times to retry - exception should be thrown"); + } catch (TransactionConflictException e) { + Assert.assertNull(e.getCause()); + } + + txClient.failCommits = 0; + + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testRetryByDefault() throws TransactionFailureException, InterruptedException { + // we want retry by default, so that engineers don't miss it + txClient.failCommits = 2; + // execute: add a change to ds1 and ds2 + getExecutor().execute(testFunction, 10); + // should not fail, but continue + + // verify both are committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertTrue(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testCommitFalse() throws TransactionFailureException, InterruptedException { + txClient.failCommits = 1; + // execute: add a change to ds1 and ds2 + try { + getExecutorWithNoRetry().execute(testFunction, 10); + Assert.fail("commit failed - exception should be thrown"); + } catch (TransactionConflictException e) { + Assert.assertNull(e.getCause()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testCanCommitFalse() throws TransactionFailureException, InterruptedException { + txClient.failCanCommitOnce = true; + // execute: add a change to ds1 and ds2 + try { + getExecutorWithNoRetry().execute(testFunction, 10); + Assert.fail("commit failed - exception should be thrown"); + } catch (TransactionConflictException e) { + Assert.assertNull(e.getCause()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failChangesTxOnce = InduceFailure.ThrowException; + ds1.failRollbackTxOnce = InduceFailure.ThrowException; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("get changes failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("changes failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertFalse(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, null); + Assert.fail("function failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("function failed", e.getCause().getMessage()); + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertFalse(ds1.checked); + Assert.assertFalse(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failStartTxOnce = InduceFailure.ThrowException; + // execute: add a change to ds1 and ds2 + try { + getExecutor().execute(testFunction, 10); + Assert.fail("start failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("start failure", e.getCause().getMessage()); + } + // verify both are not rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertFalse(ds2.started); + Assert.assertFalse(ds1.checked); + Assert.assertFalse(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + enum InduceFailure { NoFailure, ReturnFalse, ThrowException } + + static class DummyTxAware implements TransactionAware { + + Transaction tx; + boolean started = false; + boolean committed = false; + boolean checked = false; + boolean rolledBack = false; + boolean postCommitted = false; + List<byte[]> changes = Lists.newArrayList(); + + InduceFailure failStartTxOnce = InduceFailure.NoFailure; + InduceFailure failChangesTxOnce = InduceFailure.NoFailure; + InduceFailure failCommitTxOnce = InduceFailure.NoFailure; + InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; + InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; + + void addChange(byte[] key) { + changes.add(key); + } + + void reset() { + tx = null; + started = false; + checked = false; + committed = false; + rolledBack = false; + postCommitted = false; + changes.clear(); + } + + @Override + public void startTx(Transaction tx) { + reset(); + started = true; + this.tx = tx; + if (failStartTxOnce == InduceFailure.ThrowException) { + failStartTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("start failure"); + } + } + + @Override + public void updateTx(Transaction tx) { + this.tx = tx; + } + + @Override + public Collection<byte[]> getTxChanges() { + checked = true; + if (failChangesTxOnce == InduceFailure.ThrowException) { + failChangesTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("changes failure"); + } + return ImmutableList.copyOf(changes); + } + + @Override + public boolean commitTx() throws Exception { + committed = true; + if (failCommitTxOnce == InduceFailure.ThrowException) { + failCommitTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("persist failure"); + } + if (failCommitTxOnce == InduceFailure.ReturnFalse) { + failCommitTxOnce = InduceFailure.NoFailure; + return false; + } + return true; + } + + @Override + public void postTxCommit() { + postCommitted = true; + if (failPostCommitTxOnce == InduceFailure.ThrowException) { + failPostCommitTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("post failure"); + } + } + + @Override + public boolean rollbackTx() throws Exception { + rolledBack = true; + if (failRollbackTxOnce == InduceFailure.ThrowException) { + failRollbackTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("rollback failure"); + } + if (failRollbackTxOnce == InduceFailure.ReturnFalse) { + failRollbackTxOnce = InduceFailure.NoFailure; + return false; + } + return true; + } + + @Override + public String getTransactionAwareName() { + return "dummy"; + } + } + + static class DummyTxClient extends InMemoryTxSystemClient { + + boolean failCanCommitOnce = false; + int failCommits = 0; + enum CommitState { + Started, Committed, Aborted, Invalidated + } + CommitState state = CommitState.Started; + + @Inject + DummyTxClient(TransactionManager txmgr) { + super(txmgr); + } + + @Override + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { + if (failCanCommitOnce) { + failCanCommitOnce = false; + return false; + } else { + return super.canCommit(tx, changeIds); + } + } + + @Override + public boolean commit(Transaction tx) throws TransactionNotInProgressException { + if (failCommits-- > 0) { + return false; + } else { + state = CommitState.Committed; + return super.commit(tx); + } + } + + @Override + public Transaction startLong() { + state = CommitState.Started; + return super.startLong(); + } + + @Override + public Transaction startShort() { + state = CommitState.Started; + return super.startShort(); + } + + @Override + public Transaction startShort(int timeout) { + state = CommitState.Started; + return super.startShort(timeout); + } + + @Override + public void abort(Transaction tx) { + state = CommitState.Aborted; + super.abort(tx); + } + + @Override + public boolean invalidate(long tx) { + state = CommitState.Invalidated; + return super.invalidate(tx); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java new file mode 100644 index 0000000..f74e209 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class TransactionManagerTest extends TransactionSystemTest { + + static Configuration conf = new Configuration(); + + TransactionManager txManager = null; + TransactionStateStorage txStateStorage = null; + + @Override + protected TransactionSystemClient getClient() { + return new InMemoryTxSystemClient(txManager); + } + + @Override + protected TransactionStateStorage getStateStorage() { + return txStateStorage; + } + + @Before + public void before() { + conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread + // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage + txStateStorage = new InMemoryTransactionStateStorage(); + txManager = new TransactionManager + (conf, txStateStorage, new TxMetricsCollector()); + txManager.startAndWait(); + } + + @After + public void after() { + txManager.stopAndWait(); + } + + @Test + public void testTransactionCleanup() throws Exception { + conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); + conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2); + // using a new tx manager that cleans up + TransactionManager txm = new TransactionManager + (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); + txm.startAndWait(); + try { + Assert.assertEquals(0, txm.getInvalidSize()); + Assert.assertEquals(0, txm.getCommittedSize()); + // start a transaction and leave it open + Transaction tx1 = txm.startShort(); + // start a long running transaction and leave it open + Transaction tx2 = txm.startLong(); + Transaction tx3 = txm.startLong(); + // start and commit a bunch of transactions + for (int i = 0; i < 10; i++) { + Transaction tx = txm.startShort(); + Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); + Assert.assertTrue(txm.commit(tx)); + } + // all of these should still be in the committed set + Assert.assertEquals(0, txm.getInvalidSize()); + Assert.assertEquals(10, txm.getCommittedSize()); + // sleep longer than the cleanup interval + TimeUnit.SECONDS.sleep(5); + // transaction should now be invalid + Assert.assertEquals(1, txm.getInvalidSize()); + // run another transaction + Transaction txx = txm.startShort(); + // verify the exclude + Assert.assertFalse(txx.isVisible(tx1.getTransactionId())); + Assert.assertFalse(txx.isVisible(tx2.getTransactionId())); + Assert.assertFalse(txx.isVisible(tx3.getTransactionId())); + // try to commit the last transaction that was started + Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a }))); + Assert.assertTrue(txm.commit(txx)); + + // now the committed change sets should be empty again + Assert.assertEquals(0, txm.getCommittedSize()); + // cannot commit transaction as it was timed out + try { + txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 })); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + txm.abort(tx1); + // abort should have removed from invalid + Assert.assertEquals(0, txm.getInvalidSize()); + // run another bunch of transactions + for (int i = 0; i < 10; i++) { + Transaction tx = txm.startShort(); + Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); + Assert.assertTrue(txm.commit(tx)); + } + // none of these should still be in the committed set (tx2 is long-running). + Assert.assertEquals(0, txm.getInvalidSize()); + Assert.assertEquals(0, txm.getCommittedSize()); + // commit tx2, abort tx3 + Assert.assertTrue(txm.commit(tx2)); + txm.abort(tx3); + // none of these should still be in the committed set (tx2 is long-running). + // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes + // so it should NOT be in invalid list + Assert.assertEquals(1, txm.getInvalidSize()); + Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next()); + Assert.assertEquals(1, txm.getExcludedListSize()); + } finally { + txm.stopAndWait(); + } + } + + @Test + public void testLongTransactionCleanup() throws Exception { + conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); + conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2); + // using a new tx manager that cleans up + TransactionManager txm = new TransactionManager + (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); + txm.startAndWait(); + try { + Assert.assertEquals(0, txm.getInvalidSize()); + Assert.assertEquals(0, txm.getCommittedSize()); + + // start a long running transaction + Transaction tx1 = txm.startLong(); + + Assert.assertEquals(0, txm.getInvalidSize()); + Assert.assertEquals(0, txm.getCommittedSize()); + + // sleep longer than the cleanup interval + TimeUnit.SECONDS.sleep(5); + + // transaction should now be invalid + Assert.assertEquals(1, txm.getInvalidSize()); + Assert.assertEquals(0, txm.getCommittedSize()); + + // cannot commit transaction as it was timed out + try { + txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 })); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + + txm.abort(tx1); + // abort should not remove long running transaction from invalid list + Assert.assertEquals(1, txm.getInvalidSize()); + } finally { + txm.stopAndWait(); + } + } + + @Test + public void testTruncateInvalid() throws Exception { + InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage(); + Configuration testConf = new Configuration(conf); + // No snapshots + testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1); + TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector()); + txm1.startAndWait(); + + TransactionManager txm2 = null; + Transaction tx1; + Transaction tx2; + Transaction tx3; + Transaction tx4; + Transaction tx5; + Transaction tx6; + try { + Assert.assertEquals(0, txm1.getInvalidSize()); + + // start a few transactions + tx1 = txm1.startLong(); + tx2 = txm1.startShort(); + tx3 = txm1.startLong(); + tx4 = txm1.startShort(); + tx5 = txm1.startLong(); + tx6 = txm1.startShort(); + + // invalidate tx1, tx2, tx5 and tx6 + txm1.invalidate(tx1.getTransactionId()); + txm1.invalidate(tx2.getTransactionId()); + txm1.invalidate(tx5.getTransactionId()); + txm1.invalidate(tx6.getTransactionId()); + + // tx1, tx2, tx5 and tx6 should be in invalid list + Assert.assertEquals( + ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(), + tx6.getTransactionId()), + txm1.getCurrentState().getInvalid() + ); + + // remove tx1 and tx6 from invalid list + Assert.assertTrue(txm1.truncateInvalidTx(ImmutableSet.of(tx1.getTransactionId(), tx6.getTransactionId()))); + + // only tx2 and tx5 should be in invalid list now + Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()), + txm1.getCurrentState().getInvalid()); + + // removing in-progress transactions should not have any effect + Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), + txm1.getCurrentState().getInProgress().keySet()); + Assert.assertFalse(txm1.truncateInvalidTx(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()))); + // no change to in-progress + Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), + txm1.getCurrentState().getInProgress().keySet()); + // no change to invalid list + Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()), + txm1.getCurrentState().getInvalid()); + + // Test transaction edit logs replay + // Start another transaction manager without stopping txm1 so that snapshot does not get written, + // and all logs can be replayed. + txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector()); + txm2.startAndWait(); + Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()), + txm2.getCurrentState().getInvalid()); + Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), + txm2.getCurrentState().getInProgress().keySet()); + } finally { + txm1.stopAndWait(); + if (txm2 != null) { + txm2.stopAndWait(); + } + } + } + + @Test + public void testTruncateInvalidBeforeTime() throws Exception { + InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage(); + Configuration testConf = new Configuration(conf); + // No snapshots + testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1); + TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector()); + txm1.startAndWait(); + + TransactionManager txm2 = null; + Transaction tx1; + Transaction tx2; + Transaction tx3; + Transaction tx4; + Transaction tx5; + Transaction tx6; + try { + Assert.assertEquals(0, txm1.getInvalidSize()); + + // start a few transactions + tx1 = txm1.startLong(); + tx2 = txm1.startShort(); + // Sleep so that transaction ids get generated a millisecond apart for assertion + // TEPHRA-63 should eliminate the need to sleep + TimeUnit.MILLISECONDS.sleep(1); + long timeBeforeTx3 = System.currentTimeMillis(); + tx3 = txm1.startLong(); + tx4 = txm1.startShort(); + TimeUnit.MILLISECONDS.sleep(1); + long timeBeforeTx5 = System.currentTimeMillis(); + tx5 = txm1.startLong(); + tx6 = txm1.startShort(); + + // invalidate tx1, tx2, tx5 and tx6 + txm1.invalidate(tx1.getTransactionId()); + txm1.invalidate(tx2.getTransactionId()); + txm1.invalidate(tx5.getTransactionId()); + txm1.invalidate(tx6.getTransactionId()); + + // tx1, tx2, tx5 and tx6 should be in invalid list + Assert.assertEquals( + ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(), + tx6.getTransactionId()), + txm1.getCurrentState().getInvalid() + ); + + // remove transactions before tx3 from invalid list + Assert.assertTrue(txm1.truncateInvalidTxBefore(timeBeforeTx3)); + + // only tx5 and tx6 should be in invalid list now + Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()), + txm1.getCurrentState().getInvalid()); + + // removing invalid transactions before tx5 should throw exception since tx3 and tx4 are in-progress + Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), + txm1.getCurrentState().getInProgress().keySet()); + try { + txm1.truncateInvalidTxBefore(timeBeforeTx5); + Assert.fail("Expected InvalidTruncateTimeException exception"); + } catch (InvalidTruncateTimeException e) { + // Expected exception + } + // no change to in-progress + Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), + txm1.getCurrentState().getInProgress().keySet()); + // no change to invalid list + Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()), + txm1.getCurrentState().getInvalid()); + + // Test transaction edit logs replay + // Start another transaction manager without stopping txm1 so that snapshot does not get written, + // and all logs can be replayed. + txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector()); + txm2.startAndWait(); + Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()), + txm2.getCurrentState().getInvalid()); + Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), + txm2.getCurrentState().getInProgress().keySet()); + } finally { + txm1.stopAndWait(); + if (txm2 != null) { + txm2.stopAndWait(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java new file mode 100644 index 0000000..fa12acb --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionServiceMainTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.base.Throwables; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.distributed.TransactionServiceClient; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.concurrent.CountDownLatch; + +/** + * Test for verifying TransactionServiceMain works correctly. + */ +public class TransactionServiceMainTest { + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testClientServer() throws Exception { + // Simply start a transaction server and connect to it with the client. + InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); + zkServer.startAndWait(); + + try { + Configuration conf = new Configuration(); + conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + + final TransactionServiceMain main = new TransactionServiceMain(conf); + final CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + @Override + public void run() { + try { + main.start(); + latch.countDown(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; + + try { + t.start(); + // Wait for service to startup + latch.await(); + TransactionServiceClient.doMain(true, conf); + } finally { + main.stop(); + t.join(); + } + } finally { + zkServer.stopAndWait(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java new file mode 100644 index 0000000..797c08a --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableSet; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionStateStorage; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public abstract class TransactionSystemTest { + + public static final byte[] C1 = new byte[] { 'c', '1' }; + public static final byte[] C2 = new byte[] { 'c', '2' }; + public static final byte[] C3 = new byte[] { 'c', '3' }; + public static final byte[] C4 = new byte[] { 'c', '4' }; + + protected abstract TransactionSystemClient getClient() throws Exception; + + protected abstract TransactionStateStorage getStateStorage() throws Exception; + + @Test + public void testCommitRaceHandling() throws Exception { + TransactionSystemClient client1 = getClient(); + TransactionSystemClient client2 = getClient(); + + Transaction tx1 = client1.startShort(); + Transaction tx2 = client2.startShort(); + + Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2))); + // second one also can commit even thought there are conflicts with first since first one hasn't committed yet + Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3))); + + Assert.assertTrue(client1.commit(tx1)); + + // now second one should not commit, since there are conflicts with tx1 that has been committed + Assert.assertFalse(client2.commit(tx2)); + } + + @Test + public void testMultipleCommitsAtSameTime() throws Exception { + // We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each + // other in tx manager used for conflicts detection (we had this bug) + // NOTE: you don't have to use multiple clients for that + TransactionSystemClient client1 = getClient(); + TransactionSystemClient client2 = getClient(); + TransactionSystemClient client3 = getClient(); + TransactionSystemClient client4 = getClient(); + TransactionSystemClient client5 = getClient(); + + Transaction tx1 = client1.startShort(); + Transaction tx2 = client2.startShort(); + Transaction tx3 = client3.startShort(); + Transaction tx4 = client4.startShort(); + Transaction tx5 = client5.startShort(); + + Assert.assertTrue(client1.canCommit(tx1, asList(C1))); + Assert.assertTrue(client1.commit(tx1)); + + Assert.assertTrue(client2.canCommit(tx2, asList(C2))); + Assert.assertTrue(client2.commit(tx2)); + + // verifying conflicts detection + Assert.assertFalse(client3.canCommit(tx3, asList(C1))); + Assert.assertFalse(client4.canCommit(tx4, asList(C2))); + Assert.assertTrue(client5.canCommit(tx5, asList(C3))); + } + + @Test + public void testCommitTwice() throws Exception { + TransactionSystemClient client = getClient(); + Transaction tx = client.startShort(); + + Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.commit(tx)); + // cannot commit twice same tx + try { + Assert.assertFalse(client.commit(tx)); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + } + + @Test + public void testAbortTwice() throws Exception { + TransactionSystemClient client = getClient(); + Transaction tx = client.startShort(); + + Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + client.abort(tx); + // abort of not active tx has no affect + client.abort(tx); + } + + @Test + public void testReuseTx() throws Exception { + TransactionSystemClient client = getClient(); + Transaction tx = client.startShort(); + + Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.commit(tx)); + // can't re-use same tx again + try { + client.canCommit(tx, asList(C3, C4)); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + try { + Assert.assertFalse(client.commit(tx)); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + + // abort of not active tx has no affect + client.abort(tx); + } + + @Test + public void testUseNotStarted() throws Exception { + TransactionSystemClient client = getClient(); + Transaction tx1 = client.startShort(); + Assert.assertTrue(client.commit(tx1)); + + // we know this is one is older than current writePointer and was not used + Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1, + new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, + TransactionType.SHORT); + try { + Assert.assertFalse(client.canCommit(txOld, asList(C3, C4))); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + try { + Assert.assertFalse(client.commit(txOld)); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + // abort of not active tx has no affect + client.abort(txOld); + + // we know this is one is newer than current readPointer and was not used + Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1, + new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, + TransactionType.SHORT); + try { + Assert.assertFalse(client.canCommit(txNew, asList(C3, C4))); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + try { + Assert.assertFalse(client.commit(txNew)); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + // expected + } + // abort of not active tx has no affect + client.abort(txNew); + } + + @Test + public void testAbortAfterCommit() throws Exception { + TransactionSystemClient client = getClient(); + Transaction tx = client.startShort(); + + Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.commit(tx)); + // abort of not active tx has no affect + client.abort(tx); + } + + // todo add test invalidate method + @Test + public void testInvalidateTx() throws Exception { + TransactionSystemClient client = getClient(); + // Invalidate an in-progress tx + Transaction tx1 = client.startShort(); + client.canCommit(tx1, asList(C1, C2)); + Assert.assertTrue(client.invalidate(tx1.getTransactionId())); + // Cannot invalidate a committed tx + Transaction tx2 = client.startShort(); + client.canCommit(tx2, asList(C3, C4)); + client.commit(tx2); + Assert.assertFalse(client.invalidate(tx2.getTransactionId())); + } + + @Test + public void testResetState() throws Exception { + // have tx in progress, committing and committed then reset, + // get the last snapshot and see that it is empty + TransactionSystemClient client = getClient(); + TransactionStateStorage stateStorage = getStateStorage(); + + Transaction tx1 = client.startShort(); + Transaction tx2 = client.startShort(); + client.canCommit(tx1, asList(C1, C2)); + client.commit(tx1); + client.canCommit(tx2, asList(C3, C4)); + + Transaction txPreReset = client.startShort(); + long currentTs = System.currentTimeMillis(); + client.resetState(); + + TransactionSnapshot snapshot = stateStorage.getLatestSnapshot(); + Assert.assertTrue(snapshot.getTimestamp() >= currentTs); + Assert.assertEquals(0, snapshot.getInvalid().size()); + Assert.assertEquals(0, snapshot.getInProgress().size()); + Assert.assertEquals(0, snapshot.getCommittingChangeSets().size()); + Assert.assertEquals(0, snapshot.getCommittedChangeSets().size()); + + // confirm that transaction IDs are not reset + Transaction txPostReset = client.startShort(); + Assert.assertTrue("New tx ID should be greater than last ID before reset", + txPostReset.getTransactionId() > txPreReset.getTransactionId()); + } + + @Test + public void testTruncateInvalidTx() throws Exception { + // Start few transactions and invalidate all of them + TransactionSystemClient client = getClient(); + Transaction tx1 = client.startLong(); + Transaction tx2 = client.startShort(); + Transaction tx3 = client.startLong(); + + client.invalidate(tx1.getTransactionId()); + client.invalidate(tx2.getTransactionId()); + client.invalidate(tx3.getTransactionId()); + + // Remove tx2 and tx3 from invalid list + Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId()))); + + Transaction tx = client.startShort(); + // Only tx1 should be in invalid list now + Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids()); + client.abort(tx); + } + + @Test + public void testTruncateInvalidTxBefore() throws Exception { + // Start few transactions + TransactionSystemClient client = getClient(); + Transaction tx1 = client.startLong(); + Transaction tx2 = client.startShort(); + // Sleep so that transaction ids get generated a millisecond apart for assertion + // TEPHRA-63 should eliminate the need to sleep + TimeUnit.MILLISECONDS.sleep(1); + long beforeTx3 = System.currentTimeMillis(); + Transaction tx3 = client.startLong(); + + try { + // throws exception since tx1 and tx2 are still in-progress + client.truncateInvalidTxBefore(beforeTx3); + Assert.fail("Expected InvalidTruncateTimeException exception"); + } catch (InvalidTruncateTimeException e) { + // Expected exception + } + + // Invalidate all of them + client.invalidate(tx1.getTransactionId()); + client.invalidate(tx2.getTransactionId()); + client.invalidate(tx3.getTransactionId()); + + // Remove transactions before time beforeTx3 + Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3)); + + Transaction tx = client.startShort(); + // Only tx3 should be in invalid list now + Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids()); + client.abort(tx); + } + + @Test + public void testGetInvalidSize() throws Exception { + // Start few transactions and invalidate all of them + TransactionSystemClient client = getClient(); + Transaction tx1 = client.startLong(); + Transaction tx2 = client.startShort(); + Transaction tx3 = client.startLong(); + + Assert.assertEquals(0, client.getInvalidSize()); + + client.invalidate(tx1.getTransactionId()); + client.invalidate(tx2.getTransactionId()); + client.invalidate(tx3.getTransactionId()); + + Assert.assertEquals(3, client.getInvalidSize()); + } + + private Collection<byte[]> asList(byte[]... val) { + return Arrays.asList(val); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java new file mode 100644 index 0000000..08961f4 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.primitives.Longs; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Set; + +/** + * + */ +public class TransactionTest { + // Current transaction id + private final long txId = 200L; + // Read pointer for current transaction + private final long readPointer = 100L; + // Transactions committed before current transaction was started. + private final Set<Long> priorCommitted = ImmutableSet.of(80L, 99L, 100L); + // Transactions committed after current transaction was started. + private final Set<Long> postCommitted = ImmutableSet.of(150L, 180L, 210L); + // Invalid transactions before current transaction was started. + private final Set<Long> priorInvalids = ImmutableSet.of(90L, 110L, 190L); + // Invalid transactions after current transaction was started. + private final Set<Long> postInvalids = ImmutableSet.of(201L, 221L, 231L); + // Transactions in progress before current transaction was started. + private final Set<Long> priorInProgress = ImmutableSet.of(95L, 120L, 150L); + // Transactions in progress after current transaction was started. + private final Set<Long> postInProgress = ImmutableSet.of(205L, 215L, 300L); + + @Test + public void testSnapshotVisibility() throws Exception { + Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT; + + Set<Long> checkPointers = ImmutableSet.of(220L, 250L); + Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids), + toSortedArray(priorInProgress), 95L, + TransactionType.SHORT, toSortedArray(checkPointers), + visibilityLevel); + Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L); + Set<Long> notVisibleCurrent = ImmutableSet.of(); + assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, + visibleCurrent, notVisibleCurrent, tx); + + checkPointers = ImmutableSet.of(); + tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L, + TransactionType.SHORT, toSortedArray(checkPointers), + visibilityLevel); + visibleCurrent = ImmutableSet.of(txId); + notVisibleCurrent = ImmutableSet.of(); + assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, + visibleCurrent, notVisibleCurrent, tx); + } + + @Test + public void testSnapshotExcludeVisibility() throws Exception { + Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + + Set<Long> checkPointers = ImmutableSet.of(220L, 250L); + Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids), + toSortedArray(priorInProgress), 95L, + TransactionType.SHORT, toSortedArray(checkPointers), + visibilityLevel); + Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L); + Set<Long> notVisibleCurrent = ImmutableSet.of(250L); + assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, + visibleCurrent, notVisibleCurrent, tx); + + checkPointers = ImmutableSet.of(); + tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L, + TransactionType.SHORT, toSortedArray(checkPointers), + visibilityLevel); + visibleCurrent = ImmutableSet.of(); + notVisibleCurrent = ImmutableSet.of(txId); + assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, + visibleCurrent, notVisibleCurrent, tx); + } + + @Test + public void testSnapshotAllVisibility() throws Exception { + Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_ALL; + + Set<Long> checkPointers = ImmutableSet.of(220L, 250L); + Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids), + toSortedArray(priorInProgress), 95L, + TransactionType.SHORT, toSortedArray(checkPointers), + visibilityLevel); + Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L); + Set<Long> notVisibleCurrent = ImmutableSet.of(); + assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, + visibleCurrent, notVisibleCurrent, tx); + + checkPointers = ImmutableSet.of(); + tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), + toSortedArray(priorInProgress), 95L, + TransactionType.SHORT, toSortedArray(checkPointers), + visibilityLevel); + visibleCurrent = ImmutableSet.of(txId); + notVisibleCurrent = ImmutableSet.of(); + assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, + visibleCurrent, notVisibleCurrent, tx); + } + + private void assertVisibility(Set<Long> priorCommitted, Set<Long> postCommitted, Set<Long> priorInvalids, + Set<Long> postInvalids, Set<Long> priorInProgress, Set<Long> postInProgress, + Set<Long> visibleCurrent, Set<Long> notVisibleCurrent, + Transaction tx) { + // Verify visible snapshots of tx are visible + for (long t : visibleCurrent) { + Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t)); + } + + // Verify not visible snapshots of tx are not visible + for (long t : notVisibleCurrent) { + Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t)); + } + + // Verify prior committed versions are visible + for (long t : priorCommitted) { + Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t)); + } + + // Verify versions committed after tx started, and not part of tx are not visible + for (long t : postCommitted) { + Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t)); + } + + // Verify invalid and in-progress versions are not visible + for (long t : Iterables.concat(priorInvalids, postInvalids, priorInProgress, postInProgress)) { + Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t)); + } + } + + private long[] toSortedArray(Set<Long> set) { + long[] array = Longs.toArray(set); + Arrays.sort(array); + return array; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java new file mode 100644 index 0000000..6a30280 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ElasticPoolTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.base.Throwables; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests for {@link ElasticPool}. + */ +public class ElasticPoolTest { + + static class Dummy { + static AtomicInteger count = new AtomicInteger(0); + boolean valid = true; + Dummy() { + count.incrementAndGet(); + } + void markInvalid() { + valid = false; + } + + public boolean isValid() { + return valid; + } + } + + class DummyPool extends ElasticPool<Dummy, RuntimeException> { + + public DummyPool(int sizeLimit) { + super(sizeLimit); + } + + @Override + protected Dummy create() { + return new Dummy(); + } + + @Override + protected boolean recycle(Dummy element) { + return element.isValid(); + } + } + + @Test(timeout = 5000) + public void testFewerThreadsThanElements() throws InterruptedException { + final DummyPool pool = new DummyPool(5); + Dummy.count.set(0); + createAndRunThreads(2, pool, false); + // we only ran 2 threads, so only 2 elements got created, even though pool size is 5 + Assert.assertEquals(2, Dummy.count.get()); + } + + @Test(timeout = 5000) + public void testMoreThreadsThanElements() throws InterruptedException { + final DummyPool pool = new DummyPool(2); + Dummy.count.set(0); + createAndRunThreads(5, pool, false); + // even though we ran 5 threads, only 2 elements got created because pool size is 2 + Assert.assertEquals(2, Dummy.count.get()); + } + + @Test(timeout = 5000) + public void testMoreThreadsThanElementsWithDiscard() throws InterruptedException { + final DummyPool pool = new DummyPool(2); + Dummy.count.set(0); + int numThreads = 3; + // pass 'true' as the last parameter, which results in the elements being discarded after each obtain() call. + createAndRunThreads(numThreads, pool, true); + // this results in (5 * numThreads) number of elements being created since each thread obtains a client 5 times. + Assert.assertEquals(5 * numThreads, Dummy.count.get()); + } + + // Creates a list of threads which obtain a client from the pool, sleeps for a certain amount of time, and then + // releases the client back to the pool, optionally marking it invalid before doing so. It repeats this five times. + // Then, runs these threads to completion. + private void createAndRunThreads(int numThreads, final DummyPool pool, + final boolean discardAtEnd) throws InterruptedException { + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread() { + @Override + public void run() { + for (int j = 0; j < 5; ++j) { + Dummy dummy; + try { + dummy = pool.obtain(); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { + Thread.sleep(10L); + } catch (InterruptedException e) { + // ignored + } + if (discardAtEnd) { + dummy.markInvalid(); + } + pool.release(dummy); + } + } + }; + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java new file mode 100644 index 0000000..90a69e9 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.base.Throwables; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TransactionServiceMain; +import org.apache.tephra.TxConstants; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +public class PooledClientProviderTest { + + public static final int MAX_CLIENT_COUNT = 3; + public static final long CLIENT_OBTAIN_TIMEOUT = 10; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testClientConnectionPoolMaximumNumberOfClients() throws Exception { + // We need a server for the client to connect to + InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); + zkServer.startAndWait(); + + try { + Configuration conf = new Configuration(); + conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + conf.set("data.tx.client.count", Integer.toString(MAX_CLIENT_COUNT)); + conf.set("data.tx.client.obtain.timeout", Long.toString(CLIENT_OBTAIN_TIMEOUT)); + + final TransactionServiceMain main = new TransactionServiceMain(conf); + final CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread() { + @Override + public void run() { + try { + main.start(); + latch.countDown(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; + + try { + t.start(); + // Wait for service to startup + latch.await(); + + startClientAndTestPool(conf); + } finally { + main.stop(); + t.join(); + } + } finally { + zkServer.stopAndWait(); + } + } + + private void startClientAndTestPool(Configuration conf) throws Exception { + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + new TransactionModules().getDistributedModules(), + new TransactionClientModule() + ); + + ZKClientService zkClient = injector.getInstance(ZKClientService.class); + zkClient.startAndWait(); + + final PooledClientProvider clientProvider = new PooledClientProvider(conf, + injector.getInstance(DiscoveryServiceClient.class)); + + // test simple case of get + return. Note: this also initializes the provider's pool, which + // takes about one second (discovery). Doing it before we test the threads makes it so that one + // thread doesn't take exceptionally longer than the others. + try (CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient()) { + // do nothing with the client + } + + //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more. + List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>(); + CountDownLatch countDownLatch = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1); + for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) { + clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch))); + } + countDownLatch.countDown(); + + Set<Integer> ids = new HashSet<Integer>(); + for (Future<Integer> id : clientIds) { + ids.add(id.get()); + } + Assert.assertEquals(MAX_CLIENT_COUNT, ids.size()); + + // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value. + // one of the threads should throw a TimeOutException, because the other threads don't release their clients + // within the configured timeout. + countDownLatch = new CountDownLatch(1); + for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) { + clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch))); + } + countDownLatch.countDown(); + int numTimeoutExceptions = 0; + for (Future<Integer> clientId : clientIds) { + try { + clientId.get(); + } catch (ExecutionException expected) { + Assert.assertEquals(TimeoutException.class, expected.getCause().getClass()); + numTimeoutExceptions++; + } + } + // expect that exactly one of the threads hit the TimeoutException + Assert.assertEquals(String.format("Expected one thread to not obtain a client within %s milliseconds.", + CLIENT_OBTAIN_TIMEOUT), + 1, numTimeoutExceptions); + + executor.shutdown(); + } + + private static class RetrieveClient implements Callable<Integer> { + private final PooledClientProvider pool; + private final long holdClientMs; + private final CountDownLatch begin; + + public RetrieveClient(PooledClientProvider pool, long holdClientMs, + CountDownLatch begin) { + this.pool = pool; + this.holdClientMs = holdClientMs; + this.begin = begin; + } + + @Override + public Integer call() throws Exception { + begin.await(); + try (CloseableThriftClient client = pool.getCloseableClient()) { + int id = System.identityHashCode(client.getThriftClient()); + // "use" the client for a configured amount of milliseconds + Thread.sleep(holdClientMs); + return id; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java new file mode 100644 index 0000000..a930720 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.ThriftTransactionSystemTest; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionEdit; +import org.apache.tephra.persist.TransactionLog; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132 + */ +public class ThriftTransactionServerTest { + private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); + + private static InMemoryZKServer zkServer; + private static ZKClientService zkClientService; + private static TransactionService txService; + private static TransactionStateStorage storage; + static Injector injector; + + private static final int NUM_CLIENTS = 17; + private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1); + private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS); + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void start() throws Exception { + zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); + zkServer.startAndWait(); + + Configuration conf = new Configuration(); + conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); + conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); + conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); + conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, NUM_CLIENTS); + conf.setLong(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, TimeUnit.HOURS.toMillis(1)); + conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, 2); + conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, 4); + + injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + Modules.override(new TransactionModules().getDistributedModules()) + .with(new AbstractModule() { + @Override + protected void configure() { + bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON); + } + }), + new TransactionClientModule() + ); + + zkClientService = injector.getInstance(ZKClientService.class); + zkClientService.startAndWait(); + + // start a tx server + txService = injector.getInstance(TransactionService.class); + storage = injector.getInstance(TransactionStateStorage.class); + try { + LOG.info("Starting transaction service"); + txService.startAndWait(); + } catch (Exception e) { + LOG.error("Failed to start service: ", e); + } + } + + @Before + public void reset() throws Exception { + getClient().resetState(); + } + + @AfterClass + public static void stop() throws Exception { + txService.stopAndWait(); + storage.stopAndWait(); + zkClientService.stopAndWait(); + zkServer.stopAndWait(); + } + + public TransactionSystemClient getClient() throws Exception { + return injector.getInstance(TransactionSystemClient.class); + } + + @Test + public void testThriftServerStop() throws Exception { + int nThreads = NUM_CLIENTS; + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + for (int i = 0; i < nThreads; ++i) { + executorService.submit(new Runnable() { + @Override + public void run() { + try { + TransactionSystemClient txClient = getClient(); + CLIENTS_DONE_LATCH.countDown(); + txClient.startShort(); + } catch (Exception e) { + // Exception expected + } + } + }); + } + + // Wait till all clients finish sending reqeust to transaction manager + CLIENTS_DONE_LATCH.await(); + TimeUnit.SECONDS.sleep(1); + + // Expire zookeeper session, which causes Thrift server to stop. + expireZkSession(zkClientService); + waitForThriftTermination(); + + // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again. + zkClientService.stopAndWait(); + STORAGE_WAIT_LATCH.countDown(); + TimeUnit.SECONDS.sleep(1); + + // Make sure Thrift server stopped. + Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState()); + } + + private void expireZkSession(ZKClientService zkClientService) throws Exception { + ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get(); + final SettableFuture<?> connectFuture = SettableFuture.create(); + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + connectFuture.set(null); + } + } + }; + + // Create another Zookeeper session with the same sessionId so that the original one expires. + final ZooKeeper dupZookeeper = + new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher, + zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); + connectFuture.get(30, TimeUnit.SECONDS); + Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED); + dupZookeeper.close(); + } + + private void waitForThriftTermination() throws InterruptedException { + int count = 0; + while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) { + TimeUnit.MILLISECONDS.sleep(50); + } + } + + private static class SlowTransactionStorage extends InMemoryTransactionStateStorage { + @Override + public TransactionLog createLog(long timestamp) throws IOException { + return new SlowTransactionLog(timestamp); + } + } + + private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog { + public SlowTransactionLog(long timestamp) { + super(timestamp); + } + + @Override + public void append(TransactionEdit edit) throws IOException { + try { + STORAGE_WAIT_LATCH.await(); + } catch (InterruptedException e) { + LOG.error("Got exception: ", e); + } + super.append(edit); + } + + @Override + public void append(List<TransactionEdit> edits) throws IOException { + try { + STORAGE_WAIT_LATCH.await(); + } catch (InterruptedException e) { + LOG.error("Got exception: ", e); + } + super.append(edits); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java b/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java new file mode 100644 index 0000000..4828adf --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/hbase/AbstractTransactionVisibilityFilterTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.tephra.util.ConfigurationFactory; +import org.junit.After; +import org.junit.Before; + +import java.util.List; + +/** + * Common test class for TransactionVisibilityFilter implementations. + */ +public abstract class AbstractTransactionVisibilityFilterTest { + + protected static final byte[] FAM = new byte[] {'f'}; + protected static final byte[] FAM2 = new byte[] {'f', '2'}; + protected static final byte[] FAM3 = new byte[] {'f', '3'}; + protected static final byte[] COL = new byte[] {'c'}; + protected static final List<byte[]> EMPTY_CHANGESET = Lists.newArrayListWithCapacity(0); + + protected TransactionManager txManager; + + @Before + public void setup() throws Exception { + Configuration conf = new ConfigurationFactory().get(); + conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + txManager = new TransactionManager(conf); + txManager.startAndWait(); + } + + @After + public void tearDown() throws Exception { + txManager.stopAndWait(); + } +}
