http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java deleted file mode 100644 index 4a3af3c..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionType; -import com.google.common.collect.Sets; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import org.junit.Assert; -import org.junit.Test; - -import java.io.DataInput; -import java.io.IOException; - -/** - * test for {@link TransactionEdit} - */ -public class TransactionEditTest { - private static final byte[] COL = new byte[] {'c'}; - - @Test - public void testV1SerdeCompat() throws Exception { - TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV1(); - // start tx edit and committed tx edit cover all fields of tx edit - // NOTE: set visibilityUpperBound to 0 and transaction type to null as this is expected default - // for decoding older versions that doesn't store it - verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 0L, 1000L, null), olderCodec); - verifyDecodingSupportsOlderVersion( - TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec); - } - - @Test - public void testV2SerdeCompat() throws Exception { - TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV2(); - // start tx edit and committed tx edit cover all fields of tx edit - // NOTE: transaction type to null as this is expected default for decoding older versions that doesn't store it - verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 100L, 1000L, null), olderCodec); - verifyDecodingSupportsOlderVersion( - TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec); - } - - @SuppressWarnings("deprecation") - private void verifyDecodingSupportsOlderVersion(TransactionEdit edit, - TransactionEditCodecs.TransactionEditCodec olderCodec) - throws IOException { - // encoding with older version of codec - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - TransactionEditCodecs.encode(edit, out, olderCodec); - - // decoding - TransactionEdit decodedEdit = new TransactionEdit(); - DataInput in = ByteStreams.newDataInput(out.toByteArray()); - decodedEdit.readFields(in); - - Assert.assertEquals(edit, decodedEdit); - } - - @Test - public void testSerialization() throws Exception { - assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0])); - assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L })); - assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[0])); - assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[]{ 2L, 3L })); - - assertSerializedEdit(TransactionEdit.createCheckpoint(2L, 1L)); - - assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, false)); - assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, true)); - assertSerializedEdit(TransactionEdit.createCommitted(1L, - Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})), 2L, false)); - assertSerializedEdit(TransactionEdit.createCommitted(1L, - Sets.newHashSet(new ChangeId(new byte[]{ 'a', 'b', 'c' }), new ChangeId(new byte[]{ 'd', 'e', 'f' })), - 2L, true)); - - assertSerializedEdit(TransactionEdit.createCommitting(1L, Sets.<ChangeId>newHashSet())); - assertSerializedEdit(TransactionEdit.createCommitting(1L, - Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})))); - assertSerializedEdit(TransactionEdit.createCommitting(1L, - Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}), new ChangeId(new byte[]{'d', 'e', 'f'})))); - - assertSerializedEdit(TransactionEdit.createInvalid(1L)); - - assertSerializedEdit(TransactionEdit.createMoveWatermark(10L)); - - assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 1000, - TransactionType.SHORT)); - assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 10000, - TransactionType.LONG)); - - assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(Sets.newHashSet(new Long(1)))); - assertSerializedEdit(TransactionEdit.createTruncateInvalidTx( - Sets.newHashSet(new Long(1), new Long(2), new Long(3)))); - - assertSerializedEdit(TransactionEdit.createTruncateInvalidTxBefore(System.currentTimeMillis())); - } - - private void assertSerializedEdit(TransactionEdit originalEdit) throws IOException { - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - originalEdit.write(out); - - TransactionEdit decodedEdit = new TransactionEdit(); - DataInput in = ByteStreams.newDataInput(out.toByteArray()); - decodedEdit.readFields(in); - - Assert.assertEquals(originalEdit, decodedEdit); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java deleted file mode 100644 index f3fe2e1..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionNotInProgressException; -import co.cask.tephra.TransactionType; -import co.cask.tephra.TxConstants; -import co.cask.tephra.persist.TransactionSnapshot; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.persist.TransactionVisibilityState; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionModules; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.inject.Guice; -import com.google.inject.Injector; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * Tests related to {@link SnapshotCodec} implementations. - */ -public class SnapshotCodecTest { - @ClassRule - public static TemporaryFolder tmpDir = new TemporaryFolder(); - - @Test - public void testMinimalDeserilization() throws Exception { - long now = System.currentTimeMillis(); - long nowWritePointer = now * TxConstants.MAX_TX_PER_MS; - /* - * Snapshot consisting of transactions at: - */ - long tInvalid = nowWritePointer - 5; // t1 - invalid - long readPtr = nowWritePointer - 4; // t2 - here and earlier committed - long tLong = nowWritePointer - 3; // t3 - in-progress LONG - long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2) - long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4) - - TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of( - tLong, new TransactionManager.InProgressTx(readPtr, - TransactionManager.getTxExpirationFromWritePointer( - tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), - TransactionType.LONG), - tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); - - TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, - Lists.newArrayList(tInvalid), // invalid - inProgress, ImmutableMap.<Long, Set<ChangeId>>of( - tShort, Sets.<ChangeId>newHashSet()), - ImmutableMap.<Long, Set<ChangeId>>of( - tCommitted, Sets.<ChangeId>newHashSet())); - - Configuration conf1 = new Configuration(); - conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); - SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1); - - byte[] byteArray; - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - provider1.encode(out, snapshot); - byteArray = out.toByteArray(); - } - - // TransactionSnapshot and TransactionVisibilityState decode should pass now - TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray)); - TransactionVisibilityState txVisibilityState = - provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray)); - assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState); - - // Corrupt the serialization byte array so that full deserialization will fail - byteArray[byteArray.length - 1] = 'a'; - - // TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets. - TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState( - new ByteArrayInputStream(byteArray)); - Assert.assertNotNull(txVisibilityState2); - Assert.assertEquals(txVisibilityState, txVisibilityState2); - Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer()); - try { - provider1.decode(new ByteArrayInputStream(byteArray)); - Assert.fail(); - } catch (RuntimeException e) { - // expected since we modified the serialization bytes - } - } - - /** - * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of - * the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type - * correctly when the snapshot is loaded. - */ - @Test - public void testDefaultToV3Compatibility() throws Exception { - long now = System.currentTimeMillis(); - long nowWritePointer = now * TxConstants.MAX_TX_PER_MS; - /* - * Snapshot consisting of transactions at: - */ - long tInvalid = nowWritePointer - 5; // t1 - invalid - long readPtr = nowWritePointer - 4; // t2 - here and earlier committed - long tLong = nowWritePointer - 3; // t3 - in-progress LONG - long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2) - long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4) - - TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of( - tLong, new TransactionManager.InProgressTx(readPtr, - TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), - TransactionType.LONG), - tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); - - TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, - Lists.newArrayList(tInvalid), // invalid - inProgress, - ImmutableMap.<Long, Set<ChangeId>>of( - tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))), - ImmutableMap.<Long, Set<ChangeId>>of( - tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'})))); - - Configuration conf1 = new Configuration(); - conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); - SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try { - provider1.encode(out, snapshot); - } finally { - out.close(); - } - - TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray())); - TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState( - new ByteArrayInputStream(out.toByteArray())); - assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot); - - assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer()); - assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer()); - assertEquals(snapshot.getInvalid(), snapshot2.getInvalid()); - // in-progress transactions will have missing types - assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress()); - assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets()); - assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets()); - - // after fixing in-progress, full snapshot should match - Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck( - TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress()); - assertEquals(snapshot.getInProgress(), fixedInProgress); - assertEquals(snapshot, snapshot2); - } - - /** - * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3. - */ - @Test - public void testDefaultToV3Migration() throws Exception { - File testDir = tmpDir.newFolder("testDefaultToV3Migration"); - Configuration conf = new Configuration(); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); - - Injector injector = Guice.createInjector(new ConfigModule(conf), - new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules()); - - TransactionManager txManager = injector.getInstance(TransactionManager.class); - txManager.startAndWait(); - - txManager.startLong(); - - // shutdown to force a snapshot - txManager.stopAndWait(); - - TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); - txStorage.startAndWait(); - - // confirm that the in-progress entry is missing a type - TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); - TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState(); - assertTransactionVisibilityStateEquals(snapshot, txVisibilityState); - assertNotNull(snapshot); - assertEquals(1, snapshot.getInProgress().size()); - Map.Entry<Long, TransactionManager.InProgressTx> entry = - snapshot.getInProgress().entrySet().iterator().next(); - assertNull(entry.getValue().getType()); - txStorage.stopAndWait(); - - - // start a new Tx manager to test fixup - Configuration conf2 = new Configuration(); - conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); - conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, - DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName()); - Injector injector2 = Guice.createInjector(new ConfigModule(conf2), - new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules()); - - TransactionManager txManager2 = injector2.getInstance(TransactionManager.class); - txManager2.startAndWait(); - - // state should be recovered - TransactionSnapshot snapshot2 = txManager2.getCurrentState(); - assertEquals(1, snapshot2.getInProgress().size()); - Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx = - snapshot2.getInProgress().entrySet().iterator().next(); - assertEquals(TransactionType.LONG, inProgressTx.getValue().getType()); - - // save a new snapshot - txManager2.stopAndWait(); - - TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); - txStorage2.startAndWait(); - - TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot(); - // full snapshot should have deserialized correctly without any fixups - assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress()); - assertEquals(snapshot2, snapshot3); - txStorage2.stopAndWait(); - } - - @Test - public void testSnapshotCodecProviderConfiguration() throws Exception { - Configuration conf = new Configuration(false); - StringBuilder buf = new StringBuilder(); - for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) { - if (buf.length() > 0) { - buf.append(",\n "); - } - buf.append(c.getName()); - } - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString()); - - SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf); - SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion()); - assertNotNull(v1codec); - assertTrue(v1codec instanceof DefaultSnapshotCodec); - - SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion()); - assertNotNull(v2codec); - assertTrue(v2codec instanceof SnapshotCodecV2); - - SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion()); - assertNotNull(v3codec); - assertTrue(v3codec instanceof SnapshotCodecV3); - - SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion()); - assertNotNull(v4codec); - assertTrue(v4codec instanceof SnapshotCodecV4); - } - - @Test - public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException { - File testDir = tmpDir.newFolder("testSnapshotCodecV4"); - Configuration conf = new Configuration(); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); - - Injector injector = Guice.createInjector(new ConfigModule(conf), - new DiscoveryModules().getSingleNodeModules(), - new TransactionModules().getSingleNodeModules()); - - TransactionManager txManager = injector.getInstance(TransactionManager.class); - txManager.startAndWait(); - - // Create a transaction and a checkpoint transaction - Transaction transaction = txManager.startLong(); - Transaction checkpointTx = txManager.checkpoint(transaction); - - // shutdown to force a snapshot - txManager.stopAndWait(); - - // Validate the snapshot on disk - TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); - txStorage.startAndWait(); - - TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); - TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState(); - assertTransactionVisibilityStateEquals(snapshot, txVisibilityState); - - Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress(); - Assert.assertEquals(1, inProgress.size()); - - TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId()); - Assert.assertNotNull(inProgressTx); - Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(), - inProgressTx.getCheckpointWritePointers().toLongArray()); - - txStorage.stopAndWait(); - - // start a new Tx manager to see if the transaction is restored correctly. - Injector injector2 = Guice.createInjector(new ConfigModule(conf), - new DiscoveryModules().getSingleNodeModules(), - new TransactionModules().getSingleNodeModules()); - - txManager = injector2.getInstance(TransactionManager.class); - txManager.startAndWait(); - - // state should be recovered - snapshot = txManager.getCurrentState(); - inProgress = snapshot.getInProgress(); - Assert.assertEquals(1, inProgress.size()); - - inProgressTx = inProgress.get(transaction.getTransactionId()); - Assert.assertNotNull(inProgressTx); - Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(), - inProgressTx.getCheckpointWritePointers().toLongArray()); - - // Should be able to commit the transaction - Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList())); - Assert.assertTrue(txManager.commit(checkpointTx)); - - // save a new snapshot - txManager.stopAndWait(); - - TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); - txStorage2.startAndWait(); - - snapshot = txStorage2.getLatestSnapshot(); - Assert.assertTrue(snapshot.getInProgress().isEmpty()); - txStorage2.stopAndWait(); - } - - private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected, - TransactionVisibilityState input) { - Assert.assertEquals(expected.getTimestamp(), input.getTimestamp()); - Assert.assertEquals(expected.getReadPointer(), input.getReadPointer()); - Assert.assertEquals(expected.getWritePointer(), input.getWritePointer()); - Assert.assertEquals(expected.getInProgress(), input.getInProgress()); - Assert.assertEquals(expected.getInvalid(), input.getInvalid()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java b/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java deleted file mode 100644 index 0ae9ed4..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/util/AbstractConfigurationProviderTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** - * - */ -public abstract class AbstractConfigurationProviderTest { - @Test - public void testVersionFactory() { - HBaseVersion.Version foundVersion = HBaseVersion.get(); - assertEquals(getExpectedVersion(), foundVersion); - } - - protected abstract HBaseVersion.Version getExpectedVersion(); - - @Test - public void testConfigurationProvider() { - Configuration conf = new Configuration(); - conf.set("foo", "bar"); - Configuration newConf = new ConfigurationFactory().get(conf); - assertNotNull(newConf); - assertEquals("bar", newConf.get("foo")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java b/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java deleted file mode 100644 index 1bb1fe2..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/util/HBaseVersionTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import org.junit.Test; - -import java.text.ParseException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -/** - * Tests for HBase version parsing. - */ -public class HBaseVersionTest { - @Test - public void testVersionNumber() throws Exception { - HBaseVersion.VersionNumber ver = HBaseVersion.VersionNumber.create("1"); - assertVersionNumber(ver, 1, null, null, null, false); - - ver = HBaseVersion.VersionNumber.create("1-SNAPSHOT"); - assertVersionNumber(ver, 1, null, null, null, true); - - ver = HBaseVersion.VersionNumber.create("1-foo"); - assertVersionNumber(ver, 1, null, null, "foo", false); - - ver = HBaseVersion.VersionNumber.create("1-foo-SNAPSHOT"); - assertVersionNumber(ver, 1, null, null, "foo", true); - - ver = HBaseVersion.VersionNumber.create("10.0"); - assertVersionNumber(ver, 10, 0, null, null, false); - - ver = HBaseVersion.VersionNumber.create("10.0-bar"); - assertVersionNumber(ver, 10, 0, null, "bar", false); - - ver = HBaseVersion.VersionNumber.create("3.2.1"); - assertVersionNumber(ver, 3, 2, 1, null, false); - - ver = HBaseVersion.VersionNumber.create("3.2.1-SNAPSHOT"); - assertVersionNumber(ver, 3, 2, 1, null, true); - - ver = HBaseVersion.VersionNumber.create("3.2.1-baz"); - assertVersionNumber(ver, 3, 2, 1, "baz", false); - - ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3"); - assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", false); - - ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3-SNAPSHOT"); - assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", true); - - try { - ver = HBaseVersion.VersionNumber.create("abc"); - fail("Invalid verison number 'abc' should have thrown a ParseException"); - } catch (ParseException pe) { - // expected - } - - try { - ver = HBaseVersion.VersionNumber.create("1.a.b"); - fail("Invalid verison number '1.a.b' should have thrown a ParseException"); - } catch (ParseException pe) { - // expected - } - - ver = HBaseVersion.VersionNumber.create("1.2.0-CDH5.7.0"); - assertVersionNumber(ver, 1, 2, 0, "CDH5.7.0", false); - } - - private void assertVersionNumber(HBaseVersion.VersionNumber version, Integer expectedMajor, Integer expectedMinor, - Integer expectedPatch, String expectedClassifier, boolean snapshot) { - if (expectedMajor == null) { - assertNull(version.getMajor()); - } else { - assertEquals(expectedMajor, version.getMajor()); - } - if (expectedMinor == null) { - assertNull(version.getMinor()); - } else { - assertEquals(expectedMinor, version.getMinor()); - } - if (expectedPatch == null) { - assertNull(version.getPatch()); - } else { - assertEquals(expectedPatch, version.getPatch()); - } - if (expectedClassifier == null) { - assertNull(version.getClassifier()); - } else { - assertEquals(expectedClassifier, version.getClassifier()); - } - assertEquals(snapshot, version.isSnapshot()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java deleted file mode 100644 index c3269b0..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/util/TransactionEditUtil.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionType; -import co.cask.tephra.persist.TransactionEdit; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import java.util.List; -import java.util.Random; -import java.util.Set; - -/** - * Util class for {@link TransactionEdit} related tests. - */ -public final class TransactionEditUtil { - private static Random random = new Random(); - - /** - * Generates a number of semi-random {@link TransactionEdit} instances. - * These are just randomly selected from the possible states, so would not necessarily reflect a real-world - * distribution. - * - * @param numEntries how many entries to generate in the returned list. - * @return a list of randomly generated transaction log edits. - */ - public static List<TransactionEdit> createRandomEdits(int numEntries) { - List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries); - for (int i = 0; i < numEntries; i++) { - TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)]; - long writePointer = Math.abs(random.nextLong()); - switch (nextType) { - case INPROGRESS: - edits.add( - TransactionEdit.createStarted(writePointer, writePointer - 1, - System.currentTimeMillis() + 300000L, TransactionType.SHORT)); - break; - case COMMITTING: - edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10))); - break; - case COMMITTED: - edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1, - random.nextBoolean())); - break; - case INVALID: - edits.add(TransactionEdit.createInvalid(writePointer)); - break; - case ABORTED: - edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null)); - break; - case MOVE_WATERMARK: - edits.add(TransactionEdit.createMoveWatermark(writePointer)); - break; - } - } - return edits; - } - - private static Set<ChangeId> generateChangeSet(int numEntries) { - Set<ChangeId> changes = Sets.newHashSet(); - for (int i = 0; i < numEntries; i++) { - byte[] bytes = new byte[8]; - random.nextBytes(bytes); - changes.add(new ChangeId(bytes)); - } - return changes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java deleted file mode 100644 index b8188e0..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/util/TxUtilsTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import co.cask.tephra.Transaction; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * Test cases for {@link TxUtils} utility methods. - */ -public class TxUtilsTest { - @Test - public void testMaxVisibleTimestamp() { - // make sure we don't overflow with MAX_VALUE write pointer - assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java b/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java deleted file mode 100644 index 750fe28..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.visibility; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TransactionConflictException; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.persist.InMemoryTransactionStateStorage; -import com.google.common.base.Charsets; -import com.google.common.base.Throwables; -import org.apache.hadoop.conf.Configuration; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * The following are all the possible cases when using {@link VisibilityFence}. - * - * In the below table, - * "Read Txn" refers to the transaction that contains the read fence - * "Before Write", "During Write" and "After Write" refer to the write transaction time - * "Before Write Fence", "During Write Fence", "After Write Fence" refer to the write fence transaction time - * - * Timeline is: Before Write < During Write < After Write < Before Write Fence < During Write Fence < - * After Write Fence - * - * +------+----------------------+----------------------+--------------------+--------------------+ - * | Case | Read Txn Start | Read Txn Commit | Conflict on Commit | Conflict on Commit | - * | | | | of Read Txn | of Write Fence | - * +------+----------------------+----------------------+--------------------+--------------------+ - * | 1 | Before Write | Before Write | No | No | - * | 2 | Before Write | During Write | No | No | - * | 3 | Before Write | After Write | No | No | - * | 4 | Before Write | Before Write Fence | No | No | - * | 5 | Before Write | During Write Fence | No | Yes | - * | 6 | Before Write | After Write Fence | Yes | No | - * | | | | | | - * | 7 | During Write | During Write | No | No | - * | 8 | During Write | After Write | No | No | - * | 9 | During Write | Before Write Fence | No | No | - * | 10 | During Write | During Write Fence | No | Yes | - * | 11 | During Write | After Write Fence | Yes | No | - * | | | | | | - * | 12 | After Write | After Write | No | No | - * | 13 | After Write | Before Write Fence | No | No | - * | 14 | After Write | During Write Fence | No | Yes # | - * | 15 | After Write | After Write Fence | Yes # | No | - * | | | | | | - * | 16 | Before Write Fence | Before Write Fence | No | No | - * | 17 | Before Write Fence | During Write Fence | No | Yes # | - * | 18 | Before Write Fence | After Write Fence | Yes # | No | - * | | | | | | - * | 19 | During Write Fence | During Write Fence | No | No | - * | 20 | During Write Fence | After Write Fence | No | No | - * | | | | | | - * | 21 | After Write Fence | After Write Fence | No | No | - * +------+----------------------+----------------------+--------------------+--------------------+ - * - * Note: Cases marked with '#' in conflict column should not conflict, however current implementation causes - * them to conflict. The remaining conflicts are a result of the fence. - * - * In the current implementation of VisibilityFence, read txns that start "Before Write", "During Write", - * and "After Write" can be represented by read txns that start "Before Write Fence". - * Verifying cases 16, 17, 18, 20 and 21 will effectively cover all other cases. - */ -public class VisibilityFenceTest { - private static Configuration conf = new Configuration(); - - private static TransactionManager txManager = null; - - @BeforeClass - public static void before() { - txManager = new TransactionManager(conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); - txManager.startAndWait(); - } - - @AfterClass - public static void after() { - txManager.stopAndWait(); - } - - @Test - public void testFence1() throws Exception { - byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); - - // Writer updates data here in a separate transaction (code not shown) - // start tx - // update - // commit tx - - // Readers use fence to indicate that they are interested in changes to specific data - TransactionAware readFenceCase16 = VisibilityFence.create(fenceId); - TransactionContext readTxContextCase16 = - new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase16); - readTxContextCase16.start(); - readTxContextCase16.finish(); - - TransactionAware readFenceCase17 = VisibilityFence.create(fenceId); - TransactionContext readTxContextCase17 = - new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase17); - readTxContextCase17.start(); - - TransactionAware readFenceCase18 = VisibilityFence.create(fenceId); - TransactionContext readTxContextCase18 = - new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase18); - readTxContextCase18.start(); - - // Now writer needs to wait for in-progress readers to see the change, it uses write fence to do so - // Start write fence txn - TransactionAware writeFence = new WriteFence(fenceId); - TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence); - writeTxContext.start(); - - TransactionAware readFenceCase20 = VisibilityFence.create(fenceId); - TransactionContext readTxContextCase20 = - new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase20); - readTxContextCase20.start(); - - readTxContextCase17.finish(); - - assertTxnConflict(writeTxContext); - writeTxContext.start(); - - // Commit write fence txn can commit without conflicts at this point - writeTxContext.finish(); - - TransactionAware readFenceCase21 = VisibilityFence.create(fenceId); - TransactionContext readTxContextCase21 = - new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase21); - readTxContextCase21.start(); - - assertTxnConflict(readTxContextCase18); - readTxContextCase20.finish(); - readTxContextCase21.finish(); - } - - private void assertTxnConflict(TransactionContext txContext) throws Exception { - try { - txContext.finish(); - Assert.fail("Expected transaction to fail"); - } catch (TransactionConflictException e) { - // Expected - txContext.abort(); - } - } - - @Test - public void testFence2() throws Exception { - byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); - - // Readers use fence to indicate that they are interested in changes to specific data - // Reader 1 - TransactionAware readFence1 = VisibilityFence.create(fenceId); - TransactionContext readTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence1); - readTxContext1.start(); - - // Reader 2 - TransactionAware readFence2 = VisibilityFence.create(fenceId); - TransactionContext readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2); - readTxContext2.start(); - - // Reader 3 - TransactionAware readFence3 = VisibilityFence.create(fenceId); - TransactionContext readTxContext3 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence3); - readTxContext3.start(); - - // Writer updates data here in a separate transaction (code not shown) - // start tx - // update - // commit tx - - // Now writer needs to wait for readers 1, 2, and 3 to see the change, it uses write fence to do so - TransactionAware writeFence = new WriteFence(fenceId); - TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence); - writeTxContext.start(); - - // Reader 1 commits before writeFence is committed - readTxContext1.finish(); - - try { - // writeFence will throw exception since Reader 1 committed without seeing changes - writeTxContext.finish(); - Assert.fail("Expected transaction to fail"); - } catch (TransactionConflictException e) { - // Expected - writeTxContext.abort(); - } - - // Start over writeFence again - writeTxContext.start(); - - // Now, Reader 3 commits before writeFence - // Note that Reader 3 does not conflict with Reader 1 - readTxContext3.finish(); - - try { - // writeFence will throw exception again since Reader 3 committed without seeing changes - writeTxContext.finish(); - Assert.fail("Expected transaction to fail"); - } catch (TransactionConflictException e) { - // Expected - writeTxContext.abort(); - } - - // Start over writeFence again - writeTxContext.start(); - // This time writeFence commits before the other readers - writeTxContext.finish(); - - // After this point all readers will see the change - - try { - // Reader 2 commits after writeFence, hence this commit with throw exception - readTxContext2.finish(); - Assert.fail("Expected transaction to fail"); - } catch (TransactionConflictException e) { - // Expected - readTxContext2.abort(); - } - - // Reader 2 has to abort and start over again. It will see the changes now. - readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2); - readTxContext2.start(); - readTxContext2.finish(); - } - - @Test - public void testFenceAwait() throws Exception { - byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); - - final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager), - VisibilityFence.create(fenceId)); - fence1.start(); - final TransactionContext fence2 = new TransactionContext(new InMemoryTxSystemClient(txManager), - VisibilityFence.create(fenceId)); - fence2.start(); - TransactionContext fence3 = new TransactionContext(new InMemoryTxSystemClient(txManager), - VisibilityFence.create(fenceId)); - fence3.start(); - - final AtomicInteger attempts = new AtomicInteger(); - TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) { - @Override - public Transaction startShort() { - Transaction transaction = super.startShort(); - try { - switch (attempts.getAndIncrement()) { - case 0: - fence1.finish(); - break; - case 1: - fence2.finish(); - break; - case 2: - break; - default: - throw new IllegalStateException("Unexpected state"); - } - } catch (TransactionFailureException e) { - Throwables.propagate(e); - } - return transaction; - } - }; - - FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); - fenceWait.await(1000, TimeUnit.MILLISECONDS); - Assert.assertEquals(3, attempts.get()); - - try { - fence3.finish(); - Assert.fail("Expected transaction to fail"); - } catch (TransactionConflictException e) { - // Expected exception - fence3.abort(); - } - - fence3.start(); - fence3.finish(); - } - - @Test - public void testFenceTimeout() throws Exception { - byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); - - final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager), - VisibilityFence.create(fenceId)); - fence1.start(); - - final long timeout = 100; - final TimeUnit timeUnit = TimeUnit.MILLISECONDS; - final AtomicInteger attempts = new AtomicInteger(); - TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) { - @Override - public Transaction startShort() { - Transaction transaction = super.startShort(); - try { - switch (attempts.getAndIncrement()) { - case 0: - fence1.finish(); - break; - } - timeUnit.sleep(timeout + 1); - } catch (InterruptedException | TransactionFailureException e) { - Throwables.propagate(e); - } - return transaction; - } - }; - - try { - FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); - fenceWait.await(timeout, timeUnit); - Assert.fail("Expected await to fail"); - } catch (TimeoutException e) { - // Expected exception - } - Assert.assertEquals(1, attempts.get()); - - FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); - fenceWait.await(timeout, timeUnit); - Assert.assertEquals(2, attempts.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java new file mode 100644 index 0000000..28000ff --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThriftTransactionSystemTest extends TransactionSystemTest { + private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); + + private static InMemoryZKServer zkServer; + private static ZKClientService zkClientService; + private static TransactionService txService; + private static TransactionStateStorage storage; + private static TransactionSystemClient txClient; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void start() throws Exception { + zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); + zkServer.startAndWait(); + + Configuration conf = new Configuration(); + conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); + conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); + conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); + conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + Modules.override(new TransactionModules().getDistributedModules()) + .with(new AbstractModule() { + @Override + protected void configure() { + bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); + } + }), + new TransactionClientModule() + ); + + zkClientService = injector.getInstance(ZKClientService.class); + zkClientService.startAndWait(); + + // start a tx server + txService = injector.getInstance(TransactionService.class); + storage = injector.getInstance(TransactionStateStorage.class); + txClient = injector.getInstance(TransactionSystemClient.class); + try { + LOG.info("Starting transaction service"); + txService.startAndWait(); + } catch (Exception e) { + LOG.error("Failed to start service: ", e); + } + } + + @Before + public void reset() throws Exception { + getClient().resetState(); + } + + @AfterClass + public static void stop() throws Exception { + txService.stopAndWait(); + storage.stopAndWait(); + zkClientService.stopAndWait(); + zkServer.stopAndWait(); + } + + @Override + protected TransactionSystemClient getClient() throws Exception { + return txClient; + } + + @Override + protected TransactionStateStorage getStateStorage() throws Exception { + return storage; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java new file mode 100644 index 0000000..9305229 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionAdminTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.concurrent.TimeUnit; + +public class TransactionAdminTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactionAdminTest.class); + + private static Configuration conf; + private static InMemoryZKServer zkServer; + private static ZKClientService zkClientService; + private static TransactionService txService; + private static TransactionSystemClient txClient; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void start() throws Exception { + zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); + zkServer.startAndWait(); + + conf = new Configuration(); + conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); + conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); + conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); + conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + Modules.override(new TransactionModules().getDistributedModules()) + .with(new AbstractModule() { + @Override + protected void configure() { + bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); + } + }), + new TransactionClientModule() + ); + + zkClientService = injector.getInstance(ZKClientService.class); + zkClientService.startAndWait(); + + // start a tx server + txService = injector.getInstance(TransactionService.class); + txClient = injector.getInstance(TransactionSystemClient.class); + try { + LOG.info("Starting transaction service"); + txService.startAndWait(); + } catch (Exception e) { + LOG.error("Failed to start service: ", e); + } + } + + @Before + public void reset() throws Exception { + txClient.resetState(); + } + + @AfterClass + public static void stop() throws Exception { + txService.stopAndWait(); + zkClientService.stopAndWait(); + zkServer.stopAndWait(); + } + + @Test + public void testPrintUsage() throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err)); + int status = txAdmin.doMain(new String[0], conf); + Assert.assertEquals(1, status); + //noinspection ConstantConditions + Assert.assertTrue(err.toString("UTF-8").startsWith("Usage:")); + Assert.assertEquals(0, out.toByteArray().length); + } + + @Test + public void testTruncateInvalidTx() throws Exception { + Transaction tx1 = txClient.startLong(); + Transaction tx2 = txClient.startShort(); + txClient.invalidate(tx1.getTransactionId()); + txClient.invalidate(tx2.getTransactionId()); + Assert.assertEquals(2, txClient.getInvalidSize()); + + TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err)); + int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx", String.valueOf(tx2.getTransactionId())}, conf); + Assert.assertEquals(0, status); + Assert.assertEquals(1, txClient.getInvalidSize()); + } + + @Test + public void testTruncateInvalidTxBefore() throws Exception { + Transaction tx1 = txClient.startLong(); + TimeUnit.MILLISECONDS.sleep(1); + long beforeTx2 = System.currentTimeMillis(); + Transaction tx2 = txClient.startLong(); + + // Try before invalidation + Assert.assertEquals(0, txClient.getInvalidSize()); + TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err)); + int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf); + // Assert command failed due to in-progress transactions + Assert.assertEquals(1, status); + // Assert no change to invalid size + Assert.assertEquals(0, txClient.getInvalidSize()); + + txClient.invalidate(tx1.getTransactionId()); + txClient.invalidate(tx2.getTransactionId()); + Assert.assertEquals(2, txClient.getInvalidSize()); + + status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf); + Assert.assertEquals(0, status); + Assert.assertEquals(1, txClient.getInvalidSize()); + } + + @Test + public void testGetInvalidTxSize() throws Exception { + Transaction tx1 = txClient.startShort(); + txClient.startLong(); + txClient.invalidate(tx1.getTransactionId()); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err)); + int status = txAdmin.doMain(new String[]{"--get-invalid-tx-size"}, conf); + Assert.assertEquals(0, status); + //noinspection ConstantConditions + Assert.assertTrue(out.toString("UTF-8").contains("Invalid list size: 1\n")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java new file mode 100644 index 0000000..20f6944 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.snapshot.SnapshotCodecV4; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * Tests the transaction executor. + */ +public class TransactionContextTest { + private static DummyTxClient txClient; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws IOException { + final Configuration conf = new Configuration(); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new DiscoveryModules().getInMemoryModules(), + Modules.override( + new TransactionModules().getInMemoryModules()).with(new AbstractModule() { + @Override + protected void configure() { + TransactionManager txManager = new TransactionManager(conf); + txManager.startAndWait(); + bind(TransactionManager.class).toInstance(txManager); + bind(TransactionSystemClient.class).to(DummyTxClient.class).in(Singleton.class); + } + })); + + txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class); + } + + final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); + + static final byte[] A = { 'a' }; + static final byte[] B = { 'b' }; + + private static TransactionContext newTransactionContext(TransactionAware... txAwares) { + return new TransactionContext(txClient, txAwares); + } + + @Before + public void resetTxAwares() { + ds1.reset(); + ds2.reset(); + } + + @Test + public void testSuccessful() throws TransactionFailureException, InterruptedException { + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction + context.finish(); + // verify both are committed and post-committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertTrue(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testPostCommitFailure() throws TransactionFailureException, InterruptedException { + ds1.failPostCommitTxOnce = InduceFailure.ThrowException; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail but without rollback as the failure happens post-commit + try { + context.finish(); + Assert.fail("post commit failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("post failure", e.getCause().getMessage()); + } + // verify both are committed and post-committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertTrue(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testPersistFailure() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ThrowException; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("Persist should have failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("persist failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testPersistFalse() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ReturnFalse; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("Persist should have failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertNull(e.getCause()); // in this case, the ds simply returned false + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ThrowException; + ds1.failRollbackTxOnce = InduceFailure.ThrowException; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("Persist should have failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("persist failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException { + ds1.failCommitTxOnce = InduceFailure.ReturnFalse; + ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("Persist should have failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertNull(e.getCause()); // in this case, the ds simply returned false + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testCommitFalse() throws TransactionFailureException, InterruptedException { + txClient.failCommits = 1; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("commit failed - exception should be thrown"); + } catch (TransactionConflictException e) { + Assert.assertNull(e.getCause()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testCanCommitFalse() throws TransactionFailureException, InterruptedException { + txClient.failCanCommitOnce = true; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("commit failed - exception should be thrown"); + } catch (TransactionConflictException e) { + Assert.assertNull(e.getCause()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failChangesTxOnce = InduceFailure.ThrowException; + ds1.failRollbackTxOnce = InduceFailure.ThrowException; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + context.start(); + // add a change to ds1 and ds2 + ds1.addChange(A); + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("get changes failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("changes failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is invalidated + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertFalse(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Invalidated); + } + + @Test + public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException { + ds1.failStartTxOnce = InduceFailure.ThrowException; + TransactionContext context = newTransactionContext(ds1, ds2); + // start transaction + try { + context.start(); + Assert.fail("start failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("start failure", e.getCause().getMessage()); + } + // verify both are not rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertFalse(ds2.started); + Assert.assertFalse(ds1.checked); + Assert.assertFalse(ds2.checked); + Assert.assertFalse(ds1.committed); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testAddThenSuccess() throws TransactionFailureException, InterruptedException { + TransactionContext context = newTransactionContext(ds1); + // start transaction + context.start(); + // add a change to ds1 + ds1.addChange(A); + // add ds2 to the tx + context.addTransactionAware(ds2); + // add a change to ds2 + ds2.addChange(B); + // commit transaction + context.finish(); + // verify both are committed and post-committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertTrue(ds2.postCommitted); + Assert.assertFalse(ds1.rolledBack); + Assert.assertFalse(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testAddThenFailure() throws TransactionFailureException, InterruptedException { + ds2.failCommitTxOnce = InduceFailure.ThrowException; + + TransactionContext context = newTransactionContext(ds1); + // start transaction + context.start(); + // add a change to ds1 + ds1.addChange(A); + // add ds2 to the tx + context.addTransactionAware(ds2); + // add a change to ds2 + ds2.addChange(B); + // commit transaction should fail and cause rollback + try { + context.finish(); + Assert.fail("Persist should have failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("persist failure", e.getCause().getMessage()); + } + // verify both are rolled back and tx is aborted + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds2.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds2.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds2.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertFalse(ds2.postCommitted); + Assert.assertTrue(ds1.rolledBack); + Assert.assertTrue(ds2.rolledBack); + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + @Test + public void testAddThenRemoveSuccess() throws TransactionFailureException { + TransactionContext context = newTransactionContext(); + + context.start(); + Assert.assertTrue(context.addTransactionAware(ds1)); + ds1.addChange(A); + + try { + context.removeTransactionAware(ds1); + Assert.fail("Removal of TransactionAware should fails when there is active transaction."); + } catch (IllegalStateException e) { + // Expected + } + + context.finish(); + + Assert.assertTrue(context.removeTransactionAware(ds1)); + // Removing a TransactionAware not added before should returns false + Assert.assertFalse(context.removeTransactionAware(ds2)); + + // Verify ds1 is committed and post-committed + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds1.committed); + Assert.assertTrue(ds1.postCommitted); + Assert.assertFalse(ds1.rolledBack); + + // Verify nothing happen to ds2 + Assert.assertFalse(ds2.started); + Assert.assertFalse(ds2.checked); + Assert.assertFalse(ds2.committed); + Assert.assertFalse(ds2.postCommitted); + Assert.assertFalse(ds2.rolledBack); + + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Committed); + } + + @Test + public void testAndThenRemoveOnFailure() throws TransactionFailureException { + ds1.failCommitTxOnce = InduceFailure.ThrowException; + TransactionContext context = newTransactionContext(); + + context.start(); + Assert.assertTrue(context.addTransactionAware(ds1)); + ds1.addChange(A); + + try { + context.finish(); + Assert.fail("Persist should have failed - exception should be thrown"); + } catch (TransactionFailureException e) { + Assert.assertEquals("persist failure", e.getCause().getMessage()); + } + + Assert.assertTrue(context.removeTransactionAware(ds1)); + + // Verify ds1 is rolled back + Assert.assertTrue(ds1.started); + Assert.assertTrue(ds1.checked); + Assert.assertTrue(ds1.committed); + Assert.assertFalse(ds1.postCommitted); + Assert.assertTrue(ds1.rolledBack); + + Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); + } + + enum InduceFailure { NoFailure, ReturnFalse, ThrowException } + + static class DummyTxAware implements TransactionAware { + + Transaction tx; + boolean started = false; + boolean committed = false; + boolean checked = false; + boolean rolledBack = false; + boolean postCommitted = false; + List<byte[]> changes = Lists.newArrayList(); + + InduceFailure failStartTxOnce = InduceFailure.NoFailure; + InduceFailure failChangesTxOnce = InduceFailure.NoFailure; + InduceFailure failCommitTxOnce = InduceFailure.NoFailure; + InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; + InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; + + void addChange(byte[] key) { + changes.add(key); + } + + void reset() { + tx = null; + started = false; + checked = false; + committed = false; + rolledBack = false; + postCommitted = false; + changes.clear(); + } + + @Override + public void startTx(Transaction tx) { + reset(); + started = true; + this.tx = tx; + if (failStartTxOnce == InduceFailure.ThrowException) { + failStartTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("start failure"); + } + } + + @Override + public void updateTx(Transaction tx) { + this.tx = tx; + } + + @Override + public Collection<byte[]> getTxChanges() { + checked = true; + if (failChangesTxOnce == InduceFailure.ThrowException) { + failChangesTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("changes failure"); + } + return ImmutableList.copyOf(changes); + } + + @Override + public boolean commitTx() throws Exception { + committed = true; + if (failCommitTxOnce == InduceFailure.ThrowException) { + failCommitTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("persist failure"); + } + if (failCommitTxOnce == InduceFailure.ReturnFalse) { + failCommitTxOnce = InduceFailure.NoFailure; + return false; + } + return true; + } + + @Override + public void postTxCommit() { + postCommitted = true; + if (failPostCommitTxOnce == InduceFailure.ThrowException) { + failPostCommitTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("post failure"); + } + } + + @Override + public boolean rollbackTx() throws Exception { + rolledBack = true; + if (failRollbackTxOnce == InduceFailure.ThrowException) { + failRollbackTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("rollback failure"); + } + if (failRollbackTxOnce == InduceFailure.ReturnFalse) { + failRollbackTxOnce = InduceFailure.NoFailure; + return false; + } + return true; + } + + @Override + public String getTransactionAwareName() { + return "dummy"; + } + } + + static class DummyTxClient extends InMemoryTxSystemClient { + + boolean failCanCommitOnce = false; + int failCommits = 0; + enum CommitState { + Started, Committed, Aborted, Invalidated + } + CommitState state = CommitState.Started; + + @Inject + DummyTxClient(TransactionManager txmgr) { + super(txmgr); + } + + @Override + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { + if (failCanCommitOnce) { + failCanCommitOnce = false; + return false; + } else { + return super.canCommit(tx, changeIds); + } + } + + @Override + public boolean commit(Transaction tx) throws TransactionNotInProgressException { + if (failCommits-- > 0) { + return false; + } else { + state = CommitState.Committed; + return super.commit(tx); + } + } + + @Override + public Transaction startLong() { + state = CommitState.Started; + return super.startLong(); + } + + @Override + public Transaction startShort() { + state = CommitState.Started; + return super.startShort(); + } + + @Override + public Transaction startShort(int timeout) { + state = CommitState.Started; + return super.startShort(timeout); + } + + @Override + public void abort(Transaction tx) { + state = CommitState.Aborted; + super.abort(tx); + } + + @Override + public boolean invalidate(long tx) { + state = CommitState.Invalidated; + return super.invalidate(tx); + } + } +}
