http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java deleted file mode 100644 index 419bffb..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionContextTest.java +++ /dev/null @@ -1,676 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.snapshot.SnapshotCodecV4; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; -import com.google.inject.util.Modules; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; - -/** - * Tests the transaction executor. - */ -public class TransactionContextTest { - private static DummyTxClient txClient; - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @BeforeClass - public static void setup() throws IOException { - final Configuration conf = new Configuration(); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new DiscoveryModules().getInMemoryModules(), - Modules.override( - new TransactionModules().getInMemoryModules()).with(new AbstractModule() { - @Override - protected void configure() { - TransactionManager txManager = new TransactionManager(conf); - txManager.startAndWait(); - bind(TransactionManager.class).toInstance(txManager); - bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class); - } - })); - - txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class); - } - - final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); - - static final byte[] A = { 'a' }; - static final byte[] B = { 'b' }; - - private static TransactionContext newTransactionContext(TransactionAware... txAwares) { - return new TransactionContext(txClient, txAwares); - } - - @Before - public void resetTxAwares() { - ds1.reset(); - ds2.reset(); - } - - @Test - public void testSuccessful() throws TransactionFailureException, InterruptedException { - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction - context.finish(); - // verify both are committed and post-committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertTrue(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testPostCommitFailure() throws TransactionFailureException, InterruptedException { - ds1.failPostCommitTxOnce = InduceFailure.ThrowException; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail but without rollback as the failure happens post-commit - try { - context.finish(); - Assert.fail("post commit failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("post failure", e.getCause().getMessage()); - } - // verify both are committed and post-committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertTrue(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testPersistFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("Persist should have failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("persist failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testPersistFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("Persist should have failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertNull(e.getCause()); // in this case, the ds simply returned false - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("Persist should have failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("persist failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; - ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("Persist should have failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertNull(e.getCause()); // in this case, the ds simply returned false - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testCommitFalse() throws TransactionFailureException, InterruptedException { - txClient.failCommits = 1; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("commit failed - exception should be thrown"); - } catch (TransactionConflictException e) { - Assert.assertNull(e.getCause()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testCanCommitFalse() throws TransactionFailureException, InterruptedException { - txClient.failCanCommitOnce = true; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("commit failed - exception should be thrown"); - } catch (TransactionConflictException e) { - Assert.assertNull(e.getCause()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failChangesTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - context.start(); - // add a change to ds1 and ds2 - ds1.addChange(A); - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("get changes failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("changes failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertFalse(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failStartTxOnce = InduceFailure.ThrowException; - TransactionContext context = newTransactionContext(ds1, ds2); - // start transaction - try { - context.start(); - Assert.fail("start failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("start failure", e.getCause().getMessage()); - } - // verify both are not rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertFalse(ds2.started); - Assert.assertFalse(ds1.checked); - Assert.assertFalse(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testAddThenSuccess() throws TransactionFailureException, InterruptedException { - TransactionContext context = newTransactionContext(ds1); - // start transaction - context.start(); - // add a change to ds1 - ds1.addChange(A); - // add ds2 to the tx - context.addTransactionAware(ds2); - // add a change to ds2 - ds2.addChange(B); - // commit transaction - context.finish(); - // verify both are committed and post-committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertTrue(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testAddThenFailure() throws TransactionFailureException, InterruptedException { - ds2.failCommitTxOnce = InduceFailure.ThrowException; - - TransactionContext context = newTransactionContext(ds1); - // start transaction - context.start(); - // add a change to ds1 - ds1.addChange(A); - // add ds2 to the tx - context.addTransactionAware(ds2); - // add a change to ds2 - ds2.addChange(B); - // commit transaction should fail and cause rollback - try { - context.finish(); - Assert.fail("Persist should have failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("persist failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testAddThenRemoveSuccess() throws TransactionFailureException { - TransactionContext context = newTransactionContext(); - - context.start(); - Assert.assertTrue(context.addTransactionAware(ds1)); - ds1.addChange(A); - - try { - context.removeTransactionAware(ds1); - Assert.fail("Removal of TransactionAware should fails when there is active transaction."); - } catch (IllegalStateException e) { - // Expected - } - - context.finish(); - - Assert.assertTrue(context.removeTransactionAware(ds1)); - // Removing a TransactionAware not added before should returns false - Assert.assertFalse(context.removeTransactionAware(ds2)); - - // Verify ds1 is committed and post-committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertFalse(ds1.rolledBack); - - // Verify nothing happen to ds2 - Assert.assertFalse(ds2.started); - Assert.assertFalse(ds2.checked); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds2.postCommitted); - Assert.assertFalse(ds2.rolledBack); - - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testAndThenRemoveOnFailure() throws TransactionFailureException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - TransactionContext context = newTransactionContext(); - - context.start(); - Assert.assertTrue(context.addTransactionAware(ds1)); - ds1.addChange(A); - - try { - context.finish(); - Assert.fail("Persist should have failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("persist failure", e.getCause().getMessage()); - } - - Assert.assertTrue(context.removeTransactionAware(ds1)); - - // Verify ds1 is rolled back - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertTrue(ds1.rolledBack); - - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - enum InduceFailure { NoFailure, ReturnFalse, ThrowException } - - static class DummyTxAware implements TransactionAware { - - Transaction tx; - boolean started = false; - boolean committed = false; - boolean checked = false; - boolean rolledBack = false; - boolean postCommitted = false; - List<byte[]> changes = Lists.newArrayList(); - - InduceFailure failStartTxOnce = InduceFailure.NoFailure; - InduceFailure failChangesTxOnce = InduceFailure.NoFailure; - InduceFailure failCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; - - void addChange(byte[] key) { - changes.add(key); - } - - void reset() { - tx = null; - started = false; - checked = false; - committed = false; - rolledBack = false; - postCommitted = false; - changes.clear(); - } - - @Override - public void startTx(Transaction tx) { - reset(); - started = true; - this.tx = tx; - if (failStartTxOnce == InduceFailure.ThrowException) { - failStartTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("start failure"); - } - } - - @Override - public void updateTx(Transaction tx) { - this.tx = tx; - } - - @Override - public Collection<byte[]> getTxChanges() { - checked = true; - if (failChangesTxOnce == InduceFailure.ThrowException) { - failChangesTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("changes failure"); - } - return ImmutableList.copyOf(changes); - } - - @Override - public boolean commitTx() throws Exception { - committed = true; - if (failCommitTxOnce == InduceFailure.ThrowException) { - failCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("persist failure"); - } - if (failCommitTxOnce == InduceFailure.ReturnFalse) { - failCommitTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public void postTxCommit() { - postCommitted = true; - if (failPostCommitTxOnce == InduceFailure.ThrowException) { - failPostCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("post failure"); - } - } - - @Override - public boolean rollbackTx() throws Exception { - rolledBack = true; - if (failRollbackTxOnce == InduceFailure.ThrowException) { - failRollbackTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("rollback failure"); - } - if (failRollbackTxOnce == InduceFailure.ReturnFalse) { - failRollbackTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public String getTransactionAwareName() { - return "dummy"; - } - } - - static class DummyTxClient extends InMemoryTxSystemClient { - - boolean failCanCommitOnce = false; - int failCommits = 0; - enum CommitState { - Started, Committed, Aborted, Invalidated - } - CommitState state = CommitState.Started; - - @Inject - DummyTxClient(TransactionManager txmgr) { - super(txmgr); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { - if (failCanCommitOnce) { - failCanCommitOnce = false; - return false; - } else { - return super.canCommit(tx, changeIds); - } - } - - @Override - public boolean commit(Transaction tx) throws TransactionNotInProgressException { - if (failCommits-- > 0) { - return false; - } else { - state = CommitState.Committed; - return super.commit(tx); - } - } - - @Override - public Transaction startLong() { - state = CommitState.Started; - return super.startLong(); - } - - @Override - public Transaction startShort() { - state = CommitState.Started; - return super.startShort(); - } - - @Override - public Transaction startShort(int timeout) { - state = CommitState.Started; - return super.startShort(timeout); - } - - @Override - public void abort(Transaction tx) { - state = CommitState.Aborted; - super.abort(tx); - } - - @Override - public boolean invalidate(long tx) { - state = CommitState.Invalidated; - return super.invalidate(tx); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java deleted file mode 100644 index 7e67f2c..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionExecutorTest.java +++ /dev/null @@ -1,590 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.snapshot.DefaultSnapshotCodec; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; -import com.google.inject.util.Modules; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import javax.annotation.Nullable; - -/** - * Tests the transaction executor. - */ -public class TransactionExecutorTest { - private static DummyTxClient txClient; - private static TransactionExecutorFactory factory; - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @BeforeClass - public static void setup() throws IOException { - final Configuration conf = new Configuration(); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new DiscoveryModules().getInMemoryModules(), - Modules.override( - new TransactionModules().getInMemoryModules()).with(new AbstractModule() { - @Override - protected void configure() { - TransactionManager txManager = new TransactionManager(conf); - txManager.startAndWait(); - bind(TransactionManager.class).toInstance(txManager); - bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class); - } - })); - - txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class); - factory = injector.getInstance(TransactionExecutorFactory.class); - } - - final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); - final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2); - - private TransactionExecutor getExecutor() { - return factory.createExecutor(txAwares); - } - - private TransactionExecutor getExecutorWithNoRetry() { - return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries()); - } - - static final byte[] A = { 'a' }; - static final byte[] B = { 'b' }; - - final TransactionExecutor.Function<Integer, Integer> testFunction = - new TransactionExecutor.Function<Integer, Integer>() { - @Override - public Integer apply(@Nullable Integer input) { - ds1.addChange(A); - ds2.addChange(B); - if (input == null) { - throw new RuntimeException("function failed"); - } - return input * input; - } - }; - - @Before - public void resetTxAwares() { - ds1.reset(); - ds2.reset(); - } - - @Test - public void testSuccessful() throws TransactionFailureException, InterruptedException { - // execute: add a change to ds1 and ds2 - Integer result = getExecutor().execute(testFunction, 10); - // verify both are committed and post-committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertTrue(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertTrue(100 == result); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testPostCommitFailure() throws TransactionFailureException, InterruptedException { - ds1.failPostCommitTxOnce = InduceFailure.ThrowException; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("post commit failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("post failure", e.getCause().getMessage()); - } - // verify both are committed and post-committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertTrue(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testPersistFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("persist failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("persist failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testPersistFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("persist failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertNull(e.getCause()); // in this case, the ds simply returned false - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("persist failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("persist failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; - ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("persist failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertNull(e.getCause()); // in this case, the ds simply returned false - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testNoIndefiniteRetryByDefault() throws TransactionFailureException, InterruptedException { - // we want retry by default, so that engineers don't miss it - txClient.failCommits = 1000; - try { - // execute: add a change to ds1 and ds2 - getExecutor().execute(testFunction, 10); - Assert.fail("commit failed too many times to retry - exception should be thrown"); - } catch (TransactionConflictException e) { - Assert.assertNull(e.getCause()); - } - - txClient.failCommits = 0; - - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testRetryByDefault() throws TransactionFailureException, InterruptedException { - // we want retry by default, so that engineers don't miss it - txClient.failCommits = 2; - // execute: add a change to ds1 and ds2 - getExecutor().execute(testFunction, 10); - // should not fail, but continue - - // verify both are committed - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertTrue(ds1.postCommitted); - Assert.assertTrue(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); - } - - @Test - public void testCommitFalse() throws TransactionFailureException, InterruptedException { - txClient.failCommits = 1; - // execute: add a change to ds1 and ds2 - try { - getExecutorWithNoRetry().execute(testFunction, 10); - Assert.fail("commit failed - exception should be thrown"); - } catch (TransactionConflictException e) { - Assert.assertNull(e.getCause()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertTrue(ds1.committed); - Assert.assertTrue(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testCanCommitFalse() throws TransactionFailureException, InterruptedException { - txClient.failCanCommitOnce = true; - // execute: add a change to ds1 and ds2 - try { - getExecutorWithNoRetry().execute(testFunction, 10); - Assert.fail("commit failed - exception should be thrown"); - } catch (TransactionConflictException e) { - Assert.assertNull(e.getCause()); - } - // verify both are rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertTrue(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - @Test - public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failChangesTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("get changes failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("changes failure", e.getCause().getMessage()); - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertTrue(ds1.checked); - Assert.assertFalse(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, null); - Assert.fail("function failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("function failed", e.getCause().getMessage()); - } - // verify both are rolled back and tx is invalidated - Assert.assertTrue(ds1.started); - Assert.assertTrue(ds2.started); - Assert.assertFalse(ds1.checked); - Assert.assertFalse(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertTrue(ds1.rolledBack); - Assert.assertTrue(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); - } - - @Test - public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failStartTxOnce = InduceFailure.ThrowException; - // execute: add a change to ds1 and ds2 - try { - getExecutor().execute(testFunction, 10); - Assert.fail("start failed - exception should be thrown"); - } catch (TransactionFailureException e) { - Assert.assertEquals("start failure", e.getCause().getMessage()); - } - // verify both are not rolled back and tx is aborted - Assert.assertTrue(ds1.started); - Assert.assertFalse(ds2.started); - Assert.assertFalse(ds1.checked); - Assert.assertFalse(ds2.checked); - Assert.assertFalse(ds1.committed); - Assert.assertFalse(ds2.committed); - Assert.assertFalse(ds1.postCommitted); - Assert.assertFalse(ds2.postCommitted); - Assert.assertFalse(ds1.rolledBack); - Assert.assertFalse(ds2.rolledBack); - Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); - } - - enum InduceFailure { NoFailure, ReturnFalse, ThrowException } - - static class DummyTxAware implements TransactionAware { - - Transaction tx; - boolean started = false; - boolean committed = false; - boolean checked = false; - boolean rolledBack = false; - boolean postCommitted = false; - List<byte[]> changes = Lists.newArrayList(); - - InduceFailure failStartTxOnce = InduceFailure.NoFailure; - InduceFailure failChangesTxOnce = InduceFailure.NoFailure; - InduceFailure failCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; - - void addChange(byte[] key) { - changes.add(key); - } - - void reset() { - tx = null; - started = false; - checked = false; - committed = false; - rolledBack = false; - postCommitted = false; - changes.clear(); - } - - @Override - public void startTx(Transaction tx) { - reset(); - started = true; - this.tx = tx; - if (failStartTxOnce == InduceFailure.ThrowException) { - failStartTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("start failure"); - } - } - - @Override - public void updateTx(Transaction tx) { - this.tx = tx; - } - - @Override - public Collection<byte[]> getTxChanges() { - checked = true; - if (failChangesTxOnce == InduceFailure.ThrowException) { - failChangesTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("changes failure"); - } - return ImmutableList.copyOf(changes); - } - - @Override - public boolean commitTx() throws Exception { - committed = true; - if (failCommitTxOnce == InduceFailure.ThrowException) { - failCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("persist failure"); - } - if (failCommitTxOnce == InduceFailure.ReturnFalse) { - failCommitTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public void postTxCommit() { - postCommitted = true; - if (failPostCommitTxOnce == InduceFailure.ThrowException) { - failPostCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("post failure"); - } - } - - @Override - public boolean rollbackTx() throws Exception { - rolledBack = true; - if (failRollbackTxOnce == InduceFailure.ThrowException) { - failRollbackTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("rollback failure"); - } - if (failRollbackTxOnce == InduceFailure.ReturnFalse) { - failRollbackTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public String getTransactionAwareName() { - return "dummy"; - } - } - - static class DummyTxClient extends InMemoryTxSystemClient { - - boolean failCanCommitOnce = false; - int failCommits = 0; - enum CommitState { - Started, Committed, Aborted, Invalidated - } - CommitState state = CommitState.Started; - - @Inject - DummyTxClient(TransactionManager txmgr) { - super(txmgr); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { - if (failCanCommitOnce) { - failCanCommitOnce = false; - return false; - } else { - return super.canCommit(tx, changeIds); - } - } - - @Override - public boolean commit(Transaction tx) throws TransactionNotInProgressException { - if (failCommits-- > 0) { - return false; - } else { - state = CommitState.Committed; - return super.commit(tx); - } - } - - @Override - public Transaction startLong() { - state = CommitState.Started; - return super.startLong(); - } - - @Override - public Transaction startShort() { - state = CommitState.Started; - return super.startShort(); - } - - @Override - public Transaction startShort(int timeout) { - state = CommitState.Started; - return super.startShort(timeout); - } - - @Override - public void abort(Transaction tx) { - state = CommitState.Aborted; - super.abort(tx); - } - - @Override - public boolean invalidate(long tx) { - state = CommitState.Invalidated; - return super.invalidate(tx); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java deleted file mode 100644 index ddd32db..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionManagerTest.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.persist.InMemoryTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.conf.Configuration; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.Collections; -import java.util.concurrent.TimeUnit; - -/** - * - */ -public class TransactionManagerTest extends TransactionSystemTest { - - static Configuration conf = new Configuration(); - - TransactionManager txManager = null; - TransactionStateStorage txStateStorage = null; - - @Override - protected TransactionSystemClient getClient() { - return new InMemoryTxSystemClient(txManager); - } - - @Override - protected TransactionStateStorage getStateStorage() { - return txStateStorage; - } - - @Before - public void before() { - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread - // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage - txStateStorage = new InMemoryTransactionStateStorage(); - txManager = new TransactionManager - (conf, txStateStorage, new TxMetricsCollector()); - txManager.startAndWait(); - } - - @After - public void after() { - txManager.stopAndWait(); - } - - @Test - public void testTransactionCleanup() throws Exception { - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); - conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2); - // using a new tx manager that cleans up - TransactionManager txm = new TransactionManager - (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); - txm.startAndWait(); - try { - Assert.assertEquals(0, txm.getInvalidSize()); - Assert.assertEquals(0, txm.getCommittedSize()); - // start a transaction and leave it open - Transaction tx1 = txm.startShort(); - // start a long running transaction and leave it open - Transaction tx2 = txm.startLong(); - Transaction tx3 = txm.startLong(); - // start and commit a bunch of transactions - for (int i = 0; i < 10; i++) { - Transaction tx = txm.startShort(); - Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(txm.commit(tx)); - } - // all of these should still be in the committed set - Assert.assertEquals(0, txm.getInvalidSize()); - Assert.assertEquals(10, txm.getCommittedSize()); - // sleep longer than the cleanup interval - TimeUnit.SECONDS.sleep(5); - // transaction should now be invalid - Assert.assertEquals(1, txm.getInvalidSize()); - // run another transaction - Transaction txx = txm.startShort(); - // verify the exclude - Assert.assertFalse(txx.isVisible(tx1.getTransactionId())); - Assert.assertFalse(txx.isVisible(tx2.getTransactionId())); - Assert.assertFalse(txx.isVisible(tx3.getTransactionId())); - // try to commit the last transaction that was started - Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a }))); - Assert.assertTrue(txm.commit(txx)); - - // now the committed change sets should be empty again - Assert.assertEquals(0, txm.getCommittedSize()); - // cannot commit transaction as it was timed out - try { - txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 })); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - txm.abort(tx1); - // abort should have removed from invalid - Assert.assertEquals(0, txm.getInvalidSize()); - // run another bunch of transactions - for (int i = 0; i < 10; i++) { - Transaction tx = txm.startShort(); - Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(txm.commit(tx)); - } - // none of these should still be in the committed set (tx2 is long-running). - Assert.assertEquals(0, txm.getInvalidSize()); - Assert.assertEquals(0, txm.getCommittedSize()); - // commit tx2, abort tx3 - Assert.assertTrue(txm.commit(tx2)); - txm.abort(tx3); - // none of these should still be in the committed set (tx2 is long-running). - // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes - // so it should NOT be in invalid list - Assert.assertEquals(1, txm.getInvalidSize()); - Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next()); - Assert.assertEquals(1, txm.getExcludedListSize()); - } finally { - txm.stopAndWait(); - } - } - - @Test - public void testLongTransactionCleanup() throws Exception { - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); - conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2); - // using a new tx manager that cleans up - TransactionManager txm = new TransactionManager - (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); - txm.startAndWait(); - try { - Assert.assertEquals(0, txm.getInvalidSize()); - Assert.assertEquals(0, txm.getCommittedSize()); - - // start a long running transaction - Transaction tx1 = txm.startLong(); - - Assert.assertEquals(0, txm.getInvalidSize()); - Assert.assertEquals(0, txm.getCommittedSize()); - - // sleep longer than the cleanup interval - TimeUnit.SECONDS.sleep(5); - - // transaction should now be invalid - Assert.assertEquals(1, txm.getInvalidSize()); - Assert.assertEquals(0, txm.getCommittedSize()); - - // cannot commit transaction as it was timed out - try { - txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 })); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - - txm.abort(tx1); - // abort should not remove long running transaction from invalid list - Assert.assertEquals(1, txm.getInvalidSize()); - } finally { - txm.stopAndWait(); - } - } - - @Test - public void testTruncateInvalid() throws Exception { - InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage(); - Configuration testConf = new Configuration(conf); - // No snapshots - testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1); - TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector()); - txm1.startAndWait(); - - TransactionManager txm2 = null; - Transaction tx1; - Transaction tx2; - Transaction tx3; - Transaction tx4; - Transaction tx5; - Transaction tx6; - try { - Assert.assertEquals(0, txm1.getInvalidSize()); - - // start a few transactions - tx1 = txm1.startLong(); - tx2 = txm1.startShort(); - tx3 = txm1.startLong(); - tx4 = txm1.startShort(); - tx5 = txm1.startLong(); - tx6 = txm1.startShort(); - - // invalidate tx1, tx2, tx5 and tx6 - txm1.invalidate(tx1.getTransactionId()); - txm1.invalidate(tx2.getTransactionId()); - txm1.invalidate(tx5.getTransactionId()); - txm1.invalidate(tx6.getTransactionId()); - - // tx1, tx2, tx5 and tx6 should be in invalid list - Assert.assertEquals( - ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(), - tx6.getTransactionId()), - txm1.getCurrentState().getInvalid() - ); - - // remove tx1 and tx6 from invalid list - Assert.assertTrue(txm1.truncateInvalidTx(ImmutableSet.of(tx1.getTransactionId(), tx6.getTransactionId()))); - - // only tx2 and tx5 should be in invalid list now - Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()), - txm1.getCurrentState().getInvalid()); - - // removing in-progress transactions should not have any effect - Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), - txm1.getCurrentState().getInProgress().keySet()); - Assert.assertFalse(txm1.truncateInvalidTx(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()))); - // no change to in-progress - Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), - txm1.getCurrentState().getInProgress().keySet()); - // no change to invalid list - Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()), - txm1.getCurrentState().getInvalid()); - - // Test transaction edit logs replay - // Start another transaction manager without stopping txm1 so that snapshot does not get written, - // and all logs can be replayed. - txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector()); - txm2.startAndWait(); - Assert.assertEquals(ImmutableList.of(tx2.getTransactionId(), tx5.getTransactionId()), - txm2.getCurrentState().getInvalid()); - Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), - txm2.getCurrentState().getInProgress().keySet()); - } finally { - txm1.stopAndWait(); - if (txm2 != null) { - txm2.stopAndWait(); - } - } - } - - @Test - public void testTruncateInvalidBeforeTime() throws Exception { - InMemoryTransactionStateStorage storage = new InMemoryTransactionStateStorage(); - Configuration testConf = new Configuration(conf); - // No snapshots - testConf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, -1); - TransactionManager txm1 = new TransactionManager(testConf, storage, new TxMetricsCollector()); - txm1.startAndWait(); - - TransactionManager txm2 = null; - Transaction tx1; - Transaction tx2; - Transaction tx3; - Transaction tx4; - Transaction tx5; - Transaction tx6; - try { - Assert.assertEquals(0, txm1.getInvalidSize()); - - // start a few transactions - tx1 = txm1.startLong(); - tx2 = txm1.startShort(); - // Sleep so that transaction ids get generated a millisecond apart for assertion - // TEPHRA-63 should eliminate the need to sleep - TimeUnit.MILLISECONDS.sleep(1); - long timeBeforeTx3 = System.currentTimeMillis(); - tx3 = txm1.startLong(); - tx4 = txm1.startShort(); - TimeUnit.MILLISECONDS.sleep(1); - long timeBeforeTx5 = System.currentTimeMillis(); - tx5 = txm1.startLong(); - tx6 = txm1.startShort(); - - // invalidate tx1, tx2, tx5 and tx6 - txm1.invalidate(tx1.getTransactionId()); - txm1.invalidate(tx2.getTransactionId()); - txm1.invalidate(tx5.getTransactionId()); - txm1.invalidate(tx6.getTransactionId()); - - // tx1, tx2, tx5 and tx6 should be in invalid list - Assert.assertEquals( - ImmutableList.of(tx1.getTransactionId(), tx2.getTransactionId(), tx5.getTransactionId(), - tx6.getTransactionId()), - txm1.getCurrentState().getInvalid() - ); - - // remove transactions before tx3 from invalid list - Assert.assertTrue(txm1.truncateInvalidTxBefore(timeBeforeTx3)); - - // only tx5 and tx6 should be in invalid list now - Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()), - txm1.getCurrentState().getInvalid()); - - // removing invalid transactions before tx5 should throw exception since tx3 and tx4 are in-progress - Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), - txm1.getCurrentState().getInProgress().keySet()); - try { - txm1.truncateInvalidTxBefore(timeBeforeTx5); - Assert.fail("Expected InvalidTruncateTimeException exception"); - } catch (InvalidTruncateTimeException e) { - // Expected exception - } - // no change to in-progress - Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), - txm1.getCurrentState().getInProgress().keySet()); - // no change to invalid list - Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()), - txm1.getCurrentState().getInvalid()); - - // Test transaction edit logs replay - // Start another transaction manager without stopping txm1 so that snapshot does not get written, - // and all logs can be replayed. - txm2 = new TransactionManager(testConf, storage, new TxMetricsCollector()); - txm2.startAndWait(); - Assert.assertEquals(ImmutableList.of(tx5.getTransactionId(), tx6.getTransactionId()), - txm2.getCurrentState().getInvalid()); - Assert.assertEquals(ImmutableSet.of(tx3.getTransactionId(), tx4.getTransactionId()), - txm2.getCurrentState().getInProgress().keySet()); - } finally { - txm1.stopAndWait(); - if (txm2 != null) { - txm2.stopAndWait(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java deleted file mode 100644 index e54956d..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionServiceMainTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.distributed.TransactionServiceClient; -import com.google.common.base.Throwables; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.internal.zookeeper.InMemoryZKServer; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.util.concurrent.CountDownLatch; - -/** - * Test for verifying TransactionServiceMain works correctly. - */ -public class TransactionServiceMainTest { - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testClientServer() throws Exception { - // Simply start a transaction server and connect to it with the client. - InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); - - try { - Configuration conf = new Configuration(); - conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - - final TransactionServiceMain main = new TransactionServiceMain(conf); - final CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - @Override - public void run() { - try { - main.start(); - latch.countDown(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - }; - - try { - t.start(); - // Wait for service to startup - latch.await(); - TransactionServiceClient.doMain(true, conf); - } finally { - main.stop(); - t.join(); - } - } finally { - zkServer.stopAndWait(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java deleted file mode 100644 index cca6f5a..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionSystemTest.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.persist.TransactionSnapshot; -import co.cask.tephra.persist.TransactionStateStorage; -import com.google.common.collect.ImmutableSet; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -/** - * - */ -public abstract class TransactionSystemTest { - - public static final byte[] C1 = new byte[] { 'c', '1' }; - public static final byte[] C2 = new byte[] { 'c', '2' }; - public static final byte[] C3 = new byte[] { 'c', '3' }; - public static final byte[] C4 = new byte[] { 'c', '4' }; - - protected abstract TransactionSystemClient getClient() throws Exception; - - protected abstract TransactionStateStorage getStateStorage() throws Exception; - - @Test - public void testCommitRaceHandling() throws Exception { - TransactionSystemClient client1 = getClient(); - TransactionSystemClient client2 = getClient(); - - Transaction tx1 = client1.startShort(); - Transaction tx2 = client2.startShort(); - - Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2))); - // second one also can commit even thought there are conflicts with first since first one hasn't committed yet - Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3))); - - Assert.assertTrue(client1.commit(tx1)); - - // now second one should not commit, since there are conflicts with tx1 that has been committed - Assert.assertFalse(client2.commit(tx2)); - } - - @Test - public void testMultipleCommitsAtSameTime() throws Exception { - // We want to check that if two txs finish at same time (wrt tx manager) they do not overwrite changesets of each - // other in tx manager used for conflicts detection (we had this bug) - // NOTE: you don't have to use multiple clients for that - TransactionSystemClient client1 = getClient(); - TransactionSystemClient client2 = getClient(); - TransactionSystemClient client3 = getClient(); - TransactionSystemClient client4 = getClient(); - TransactionSystemClient client5 = getClient(); - - Transaction tx1 = client1.startShort(); - Transaction tx2 = client2.startShort(); - Transaction tx3 = client3.startShort(); - Transaction tx4 = client4.startShort(); - Transaction tx5 = client5.startShort(); - - Assert.assertTrue(client1.canCommit(tx1, asList(C1))); - Assert.assertTrue(client1.commit(tx1)); - - Assert.assertTrue(client2.canCommit(tx2, asList(C2))); - Assert.assertTrue(client2.commit(tx2)); - - // verifying conflicts detection - Assert.assertFalse(client3.canCommit(tx3, asList(C1))); - Assert.assertFalse(client4.canCommit(tx4, asList(C2))); - Assert.assertTrue(client5.canCommit(tx5, asList(C3))); - } - - @Test - public void testCommitTwice() throws Exception { - TransactionSystemClient client = getClient(); - Transaction tx = client.startShort(); - - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); - Assert.assertTrue(client.commit(tx)); - // cannot commit twice same tx - try { - Assert.assertFalse(client.commit(tx)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - } - - @Test - public void testAbortTwice() throws Exception { - TransactionSystemClient client = getClient(); - Transaction tx = client.startShort(); - - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); - client.abort(tx); - // abort of not active tx has no affect - client.abort(tx); - } - - @Test - public void testReuseTx() throws Exception { - TransactionSystemClient client = getClient(); - Transaction tx = client.startShort(); - - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); - Assert.assertTrue(client.commit(tx)); - // can't re-use same tx again - try { - client.canCommit(tx, asList(C3, C4)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - try { - Assert.assertFalse(client.commit(tx)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - - // abort of not active tx has no affect - client.abort(tx); - } - - @Test - public void testUseNotStarted() throws Exception { - TransactionSystemClient client = getClient(); - Transaction tx1 = client.startShort(); - Assert.assertTrue(client.commit(tx1)); - - // we know this is one is older than current writePointer and was not used - Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1, - new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, - TransactionType.SHORT); - try { - Assert.assertFalse(client.canCommit(txOld, asList(C3, C4))); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - try { - Assert.assertFalse(client.commit(txOld)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - // abort of not active tx has no affect - client.abort(txOld); - - // we know this is one is newer than current readPointer and was not used - Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1, - new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, - TransactionType.SHORT); - try { - Assert.assertFalse(client.canCommit(txNew, asList(C3, C4))); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - try { - Assert.assertFalse(client.commit(txNew)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - // abort of not active tx has no affect - client.abort(txNew); - } - - @Test - public void testAbortAfterCommit() throws Exception { - TransactionSystemClient client = getClient(); - Transaction tx = client.startShort(); - - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); - Assert.assertTrue(client.commit(tx)); - // abort of not active tx has no affect - client.abort(tx); - } - - // todo add test invalidate method - @Test - public void testInvalidateTx() throws Exception { - TransactionSystemClient client = getClient(); - // Invalidate an in-progress tx - Transaction tx1 = client.startShort(); - client.canCommit(tx1, asList(C1, C2)); - Assert.assertTrue(client.invalidate(tx1.getTransactionId())); - // Cannot invalidate a committed tx - Transaction tx2 = client.startShort(); - client.canCommit(tx2, asList(C3, C4)); - client.commit(tx2); - Assert.assertFalse(client.invalidate(tx2.getTransactionId())); - } - - @Test - public void testResetState() throws Exception { - // have tx in progress, committing and committed then reset, - // get the last snapshot and see that it is empty - TransactionSystemClient client = getClient(); - TransactionStateStorage stateStorage = getStateStorage(); - - Transaction tx1 = client.startShort(); - Transaction tx2 = client.startShort(); - client.canCommit(tx1, asList(C1, C2)); - client.commit(tx1); - client.canCommit(tx2, asList(C3, C4)); - - Transaction txPreReset = client.startShort(); - long currentTs = System.currentTimeMillis(); - client.resetState(); - - TransactionSnapshot snapshot = stateStorage.getLatestSnapshot(); - Assert.assertTrue(snapshot.getTimestamp() >= currentTs); - Assert.assertEquals(0, snapshot.getInvalid().size()); - Assert.assertEquals(0, snapshot.getInProgress().size()); - Assert.assertEquals(0, snapshot.getCommittingChangeSets().size()); - Assert.assertEquals(0, snapshot.getCommittedChangeSets().size()); - - // confirm that transaction IDs are not reset - Transaction txPostReset = client.startShort(); - Assert.assertTrue("New tx ID should be greater than last ID before reset", - txPostReset.getTransactionId() > txPreReset.getTransactionId()); - } - - @Test - public void testTruncateInvalidTx() throws Exception { - // Start few transactions and invalidate all of them - TransactionSystemClient client = getClient(); - Transaction tx1 = client.startLong(); - Transaction tx2 = client.startShort(); - Transaction tx3 = client.startLong(); - - client.invalidate(tx1.getTransactionId()); - client.invalidate(tx2.getTransactionId()); - client.invalidate(tx3.getTransactionId()); - - // Remove tx2 and tx3 from invalid list - Assert.assertTrue(client.truncateInvalidTx(ImmutableSet.of(tx2.getTransactionId(), tx3.getTransactionId()))); - - Transaction tx = client.startShort(); - // Only tx1 should be in invalid list now - Assert.assertArrayEquals(new long[] {tx1.getTransactionId()}, tx.getInvalids()); - client.abort(tx); - } - - @Test - public void testTruncateInvalidTxBefore() throws Exception { - // Start few transactions - TransactionSystemClient client = getClient(); - Transaction tx1 = client.startLong(); - Transaction tx2 = client.startShort(); - // Sleep so that transaction ids get generated a millisecond apart for assertion - // TEPHRA-63 should eliminate the need to sleep - TimeUnit.MILLISECONDS.sleep(1); - long beforeTx3 = System.currentTimeMillis(); - Transaction tx3 = client.startLong(); - - try { - // throws exception since tx1 and tx2 are still in-progress - client.truncateInvalidTxBefore(beforeTx3); - Assert.fail("Expected InvalidTruncateTimeException exception"); - } catch (InvalidTruncateTimeException e) { - // Expected exception - } - - // Invalidate all of them - client.invalidate(tx1.getTransactionId()); - client.invalidate(tx2.getTransactionId()); - client.invalidate(tx3.getTransactionId()); - - // Remove transactions before time beforeTx3 - Assert.assertTrue(client.truncateInvalidTxBefore(beforeTx3)); - - Transaction tx = client.startShort(); - // Only tx3 should be in invalid list now - Assert.assertArrayEquals(new long[] {tx3.getTransactionId()}, tx.getInvalids()); - client.abort(tx); - } - - @Test - public void testGetInvalidSize() throws Exception { - // Start few transactions and invalidate all of them - TransactionSystemClient client = getClient(); - Transaction tx1 = client.startLong(); - Transaction tx2 = client.startShort(); - Transaction tx3 = client.startLong(); - - Assert.assertEquals(0, client.getInvalidSize()); - - client.invalidate(tx1.getTransactionId()); - client.invalidate(tx2.getTransactionId()); - client.invalidate(tx3.getTransactionId()); - - Assert.assertEquals(3, client.getInvalidSize()); - } - - private Collection<byte[]> asList(byte[]... val) { - return Arrays.asList(val); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java deleted file mode 100644 index b1c7e43..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.primitives.Longs; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Set; - -/** - * - */ -public class TransactionTest { - // Current transaction id - private final long txId = 200L; - // Read pointer for current transaction - private final long readPointer = 100L; - // Transactions committed before current transaction was started. - private final Set<Long> priorCommitted = ImmutableSet.of(80L, 99L, 100L); - // Transactions committed after current transaction was started. - private final Set<Long> postCommitted = ImmutableSet.of(150L, 180L, 210L); - // Invalid transactions before current transaction was started. - private final Set<Long> priorInvalids = ImmutableSet.of(90L, 110L, 190L); - // Invalid transactions after current transaction was started. - private final Set<Long> postInvalids = ImmutableSet.of(201L, 221L, 231L); - // Transactions in progress before current transaction was started. - private final Set<Long> priorInProgress = ImmutableSet.of(95L, 120L, 150L); - // Transactions in progress after current transaction was started. - private final Set<Long> postInProgress = ImmutableSet.of(205L, 215L, 300L); - - @Test - public void testSnapshotVisibility() throws Exception { - Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT; - - Set<Long> checkPointers = ImmutableSet.of(220L, 250L); - Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids), - toSortedArray(priorInProgress), 95L, - TransactionType.SHORT, toSortedArray(checkPointers), - visibilityLevel); - Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L); - Set<Long> notVisibleCurrent = ImmutableSet.of(); - assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, - visibleCurrent, notVisibleCurrent, tx); - - checkPointers = ImmutableSet.of(); - tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L, - TransactionType.SHORT, toSortedArray(checkPointers), - visibilityLevel); - visibleCurrent = ImmutableSet.of(txId); - notVisibleCurrent = ImmutableSet.of(); - assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, - visibleCurrent, notVisibleCurrent, tx); - } - - @Test - public void testSnapshotExcludeVisibility() throws Exception { - Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; - - Set<Long> checkPointers = ImmutableSet.of(220L, 250L); - Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids), - toSortedArray(priorInProgress), 95L, - TransactionType.SHORT, toSortedArray(checkPointers), - visibilityLevel); - Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L); - Set<Long> notVisibleCurrent = ImmutableSet.of(250L); - assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, - visibleCurrent, notVisibleCurrent, tx); - - checkPointers = ImmutableSet.of(); - tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), toSortedArray(priorInProgress), 95L, - TransactionType.SHORT, toSortedArray(checkPointers), - visibilityLevel); - visibleCurrent = ImmutableSet.of(); - notVisibleCurrent = ImmutableSet.of(txId); - assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, - visibleCurrent, notVisibleCurrent, tx); - } - - @Test - public void testSnapshotAllVisibility() throws Exception { - Transaction.VisibilityLevel visibilityLevel = Transaction.VisibilityLevel.SNAPSHOT_ALL; - - Set<Long> checkPointers = ImmutableSet.of(220L, 250L); - Transaction tx = new Transaction(readPointer, txId, 250L, toSortedArray(priorInvalids), - toSortedArray(priorInProgress), 95L, - TransactionType.SHORT, toSortedArray(checkPointers), - visibilityLevel); - Set<Long> visibleCurrent = ImmutableSet.of(200L, 220L, 250L); - Set<Long> notVisibleCurrent = ImmutableSet.of(); - assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, - visibleCurrent, notVisibleCurrent, tx); - - checkPointers = ImmutableSet.of(); - tx = new Transaction(readPointer, txId, txId, toSortedArray(priorInvalids), - toSortedArray(priorInProgress), 95L, - TransactionType.SHORT, toSortedArray(checkPointers), - visibilityLevel); - visibleCurrent = ImmutableSet.of(txId); - notVisibleCurrent = ImmutableSet.of(); - assertVisibility(priorCommitted, postCommitted, priorInvalids, postInvalids, priorInProgress, postInProgress, - visibleCurrent, notVisibleCurrent, tx); - } - - private void assertVisibility(Set<Long> priorCommitted, Set<Long> postCommitted, Set<Long> priorInvalids, - Set<Long> postInvalids, Set<Long> priorInProgress, Set<Long> postInProgress, - Set<Long> visibleCurrent, Set<Long> notVisibleCurrent, - Transaction tx) { - // Verify visible snapshots of tx are visible - for (long t : visibleCurrent) { - Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t)); - } - - // Verify not visible snapshots of tx are not visible - for (long t : notVisibleCurrent) { - Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t)); - } - - // Verify prior committed versions are visible - for (long t : priorCommitted) { - Assert.assertTrue("Assertion error for version = " + t, tx.isVisible(t)); - } - - // Verify versions committed after tx started, and not part of tx are not visible - for (long t : postCommitted) { - Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t)); - } - - // Verify invalid and in-progress versions are not visible - for (long t : Iterables.concat(priorInvalids, postInvalids, priorInProgress, postInProgress)) { - Assert.assertFalse("Assertion error for version = " + t, tx.isVisible(t)); - } - } - - private long[] toSortedArray(Set<Long> set) { - long[] array = Longs.toArray(set); - Arrays.sort(array); - return array; - } -}
