http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java index dd17431..e33cd2c 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java @@ -33,9 +33,7 @@ import java.util.concurrent.atomic.AtomicLong; /** * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the - * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet - * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly - * once" guarantee if no failures happen. + * tx system requirements/expectations. * * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read * isolation guarantees. @@ -43,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees. */ public class DetachedTxSystemClient implements TransactionSystemClient { - // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with + // client logic may rely on tx id to grow monotonically even after restart. Hence we need to start with // value that is for sure bigger than the last one used before restart. // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms // by single client @@ -88,8 +86,8 @@ public class DetachedTxSystemClient implements TransactionSystemClient { } @Override - public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) { - return true; + public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) { + // do nothing } @Override @@ -98,6 +96,11 @@ public class DetachedTxSystemClient implements TransactionSystemClient { } @Override + public void commitOrThrow(Transaction tx) { + // do nothing + } + + @Override public void abort(Transaction tx) { // do nothing }
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java index 9e57de8..54615fc 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java @@ -18,6 +18,7 @@ package org.apache.tephra.inmemory; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; import org.apache.tephra.InvalidTruncateTimeException; import org.apache.tephra.Transaction; @@ -25,7 +26,6 @@ import org.apache.tephra.TransactionCouldNotTakeSnapshotException; import org.apache.tephra.TransactionFailureException; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionNotInProgressException; -import org.apache.tephra.TransactionSizeException; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; import org.slf4j.Logger; @@ -45,6 +45,7 @@ public class InMemoryTxSystemClient implements TransactionSystemClient { private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class); + @VisibleForTesting TransactionManager txManager; @Inject @@ -70,20 +71,34 @@ public class InMemoryTxSystemClient implements TransactionSystemClient { @Override public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { try { - return changeIds.isEmpty() || txManager.canCommit(tx, changeIds); - } catch (TransactionSizeException e) { + canCommitOrThrow(tx, changeIds); + return true; + } catch (TransactionFailureException e) { return false; } } @Override - public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException { - return changeIds.isEmpty() || txManager.canCommit(tx, changeIds); + public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) + throws TransactionFailureException { + if (!changeIds.isEmpty()) { + txManager.canCommit(tx.getTransactionId(), changeIds); + } } @Override public boolean commit(Transaction tx) throws TransactionNotInProgressException { - return txManager.commit(tx); + try { + commitOrThrow(tx); + return true; + } catch (TransactionFailureException e) { + return false; + } + } + + @Override + public void commitOrThrow(Transaction tx) throws TransactionFailureException { + txManager.commit(tx.getTransactionId(), tx.getWritePointer()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java index de46f27..558c1ea 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java @@ -61,8 +61,8 @@ public class MinimalTxSystemClient implements TransactionSystemClient { } @Override - public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) { - return true; + public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) { + // do nothing } @Override @@ -71,6 +71,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient { } @Override + public void commitOrThrow(Transaction tx) { + // do nothing + } + + @Override public void abort(Transaction tx) { // do nothing } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java index d76a98f..c31d156 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java @@ -197,8 +197,8 @@ public class TransactionSnapshot implements TransactionVisibilityState { public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer, long writePointer, InvalidTxList invalidTxList, NavigableMap<Long, TransactionManager.InProgressTx> inProgress, - Map<Long, Set<ChangeId>> committing, - NavigableMap<Long, Set<ChangeId>> committed) { + Map<Long, TransactionManager.ChangeSet> committing, + NavigableMap<Long, TransactionManager.ChangeSet> committed) { // copy invalid IDs, after sorting Collection<Long> invalidCopy = new LongArrayList(invalidTxList.toSortedArray()); // copy in-progress IDs and expirations @@ -206,13 +206,13 @@ public class TransactionSnapshot implements TransactionVisibilityState { // for committing and committed maps, we need to copy each individual Set as well to prevent modification Map<Long, Set<ChangeId>> committingCopy = Maps.newHashMap(); - for (Map.Entry<Long, Set<ChangeId>> entry : committing.entrySet()) { - committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue())); + for (Map.Entry<Long, TransactionManager.ChangeSet> entry : committing.entrySet()) { + committingCopy.put(entry.getKey(), new HashSet<>(entry.getValue().getChangeIds())); } NavigableMap<Long, Set<ChangeId>> committedCopy = new TreeMap<>(); - for (Map.Entry<Long, Set<ChangeId>> entry : committed.entrySet()) { - committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue())); + for (Map.Entry<Long, TransactionManager.ChangeSet> entry : committed.entrySet()) { + committedCopy.put(entry.getKey(), new HashSet<>(entry.getValue().getChangeIds())); } return new TransactionSnapshot(snapshotTime, readPointer, writePointer, http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/thrift/transaction.thrift ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift index 729e035..6cbfdad 100644 --- a/tephra-core/src/main/thrift/transaction.thrift +++ b/tephra-core/src/main/thrift/transaction.thrift @@ -43,6 +43,12 @@ struct TTransaction { 9: TVisibilityLevel visibilityLevel } +exception TTransactionConflictException { + 1: i64 transactionId, + 2: string conflictingKey, + 3: string conflictingClient +} + exception TTransactionNotInProgressException { 1: string message } @@ -73,15 +79,21 @@ service TTransactionServer { // TODO remove this as it was replaced with startShortWithTimeout in 0.10 TTransaction startShortTimeout(1: i32 timeout), TTransaction startShortClientId(1: string clientId) throws (1: TGenericException e), - TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1:TGenericException e), - TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e), - TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e), - TBoolean canCommitOrThrow(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e, - 2:TGenericException g,), + TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1: TGenericException e), + TTransaction startShortWithTimeout(1: i32 timeout) throws (1: TGenericException e), + // TODO remove this as it was replaced with canCommitOrThrow in 0.13 + TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1: TTransactionNotInProgressException e), + void canCommitOrThrow(1: i64 tx, 2: set<binary> changes) throws (1: TTransactionNotInProgressException e, + 2: TTransactionConflictException c, + 3: TGenericException g), + // TODO remove this as it was replaced with commitWithExn in 0.13 TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e), + void commitOrThrow(1: i64 txId, 2: i64 wp) throws (1: TTransactionNotInProgressException e, + 2: TTransactionConflictException c, + 3: TGenericException g), void abortTx(1: TTransaction tx), - bool invalidateTx(1: i64 tx), - binary getSnapshot() throws (1:TTransactionCouldNotTakeSnapshotException e), + bool invalidateTx(1: i64 txid), + binary getSnapshot() throws (1: TTransactionCouldNotTakeSnapshotException e), void resetState(), string status(), TBoolean truncateInvalidTx(1: set<i64> txns), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java b/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java new file mode 100644 index 0000000..1b34255 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/ClientIdTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test the retention of client ids in Tx Manager + */ +@SuppressWarnings("WeakerAccess") +public class ClientIdTest { + + @Test + public void testClientIdRetention() throws TransactionFailureException { + testClientIdRetention(TransactionManager.ClientIdRetention.OFF, false, false); + testClientIdRetention(TransactionManager.ClientIdRetention.ACTIVE, true, false); + testClientIdRetention(TransactionManager.ClientIdRetention.COMMITTED, true, true); + } + + private void testClientIdRetention(TransactionManager.ClientIdRetention retention, + boolean expectClientIdInProgress, + boolean expectClientIdCommitted) throws TransactionFailureException { + Configuration conf = new Configuration(); + conf.set(TxConstants.Manager.CFG_TX_RETAIN_CLIENT_ID, retention.toString()); + TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage(); + TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); + txManager.startAndWait(); + try { + testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted); + } finally { + txManager.stopAndWait(); + } + } + + public void testConflict(TransactionManager txManager, + boolean expectClientIdInProgress, + boolean expectClientIdCommitted) throws TransactionFailureException { + testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted, true); + testConflict(txManager, expectClientIdInProgress, expectClientIdCommitted, false); + } + + /** + * Tests two conflicting transactions. + * The resulting exception must carry the conflicting change key and client id. + * + * @param expectClientIdInProgress whether to expect client id in in-progress transactions + * @param expectClientIdCommitted whether to expect client id in committed chaneg sets + * @param testCanCommit whether the conflict should be induced by canCommit() or by commit() + */ + public void testConflict(TransactionManager txManager, + boolean expectClientIdInProgress, + boolean expectClientIdCommitted, + boolean testCanCommit) throws TransactionFailureException { + // start two transactions, validate client id + Transaction tx1 = txManager.startShort("clientA"); + Transaction tx2 = txManager.startShort("clientB"); + TransactionManager.InProgressTx inProgressTx1 = txManager.getInProgress(tx1.getTransactionId()); + Assert.assertNotNull(inProgressTx1); + if (expectClientIdInProgress) { + Assert.assertEquals("clientA", inProgressTx1.getClientId()); + } else { + Assert.assertNull(inProgressTx1.getClientId()); + } + + // now commit the two transactions with overlapping change sets to create a conflict + final byte[] change1 = new byte[] { '1' }; + final byte[] change2 = new byte[] { '2' }; + final byte[] change3 = new byte[] { '3' }; + if (!testCanCommit) { + txManager.canCommit(tx2.getTransactionId(), ImmutableList.of(change2, change3)); + } + txManager.canCommit(tx1.getTransactionId(), ImmutableList.of(change1, change2)); + txManager.commit(tx1.getTransactionId(), tx1.getWritePointer()); + try { + if (testCanCommit) { + txManager.canCommit(tx2.getTransactionId(), ImmutableList.of(change2, change3)); + } else { + txManager.commit(tx2.getTransactionId(), tx2.getWritePointer()); + } + Assert.fail("canCommit() should have failed with conflict"); + } catch (TransactionConflictException e) { + Assert.assertNotNull(e.getTransactionId()); + Assert.assertEquals(tx2.getTransactionId(), e.getTransactionId().longValue()); + Assert.assertEquals("2", e.getConflictingKey()); + if (expectClientIdCommitted) { + Assert.assertEquals("clientA", e.getConflictingClient()); + } else { + Assert.assertNull(e.getConflictingClient()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java new file mode 100644 index 0000000..54e8a8c --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.Collection; +import java.util.List; + +class DummyTxAware implements TransactionAware { + + enum InduceFailure { NoFailure, ReturnFalse, ThrowException } + + boolean started = false; + boolean committed = false; + boolean checked = false; + boolean rolledBack = false; + boolean postCommitted = false; + private List<byte[]> changes = Lists.newArrayList(); + + InduceFailure failStartTxOnce = InduceFailure.NoFailure; + InduceFailure failChangesTxOnce = InduceFailure.NoFailure; + InduceFailure failCommitTxOnce = InduceFailure.NoFailure; + InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; + InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; + + void addChange(byte[] key) { + changes.add(key); + } + + void reset() { + started = false; + checked = false; + committed = false; + rolledBack = false; + postCommitted = false; + changes.clear(); + } + + @Override + public void startTx(Transaction tx) { + reset(); + started = true; + if (failStartTxOnce == InduceFailure.ThrowException) { + failStartTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("start failure"); + } + } + + @Override + public void updateTx(Transaction tx) { + // do nothing + } + + @Override + public Collection<byte[]> getTxChanges() { + checked = true; + if (failChangesTxOnce == InduceFailure.ThrowException) { + failChangesTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("changes failure"); + } + return ImmutableList.copyOf(changes); + } + + @Override + public boolean commitTx() throws Exception { + committed = true; + if (failCommitTxOnce == InduceFailure.ThrowException) { + failCommitTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("persist failure"); + } + if (failCommitTxOnce == InduceFailure.ReturnFalse) { + failCommitTxOnce = InduceFailure.NoFailure; + return false; + } + return true; + } + + @Override + public void postTxCommit() { + postCommitted = true; + if (failPostCommitTxOnce == InduceFailure.ThrowException) { + failPostCommitTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("post failure"); + } + } + + @Override + public boolean rollbackTx() throws Exception { + rolledBack = true; + if (failRollbackTxOnce == InduceFailure.ThrowException) { + failRollbackTxOnce = InduceFailure.NoFailure; + throw new RuntimeException("rollback failure"); + } + if (failRollbackTxOnce == InduceFailure.ReturnFalse) { + failRollbackTxOnce = InduceFailure.NoFailure; + return false; + } + return true; + } + + @Override + public String getTransactionAwareName() { + return "dummy"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java new file mode 100644 index 0000000..0321b17 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxClient.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.inject.Inject; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; + +import java.util.Collection; + +class DummyTxClient extends InMemoryTxSystemClient { + + boolean failCanCommitOnce = false; + int failCommits = 0; + enum CommitState { + Started, Committed, Aborted, Invalidated + } + CommitState state = CommitState.Started; + + @Inject + DummyTxClient(TransactionManager txmgr) { + super(txmgr); + } + + @Override + public void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) + throws TransactionFailureException { + if (failCanCommitOnce) { + failCanCommitOnce = false; + throw new TransactionConflictException(tx.getTransactionId(), "<unknown>", null); + } else { + super.canCommitOrThrow(tx, changeIds); + } + } + + @Override + public void commitOrThrow(Transaction tx) + throws TransactionFailureException { + if (failCommits-- > 0) { + throw new TransactionConflictException(tx.getTransactionId(), "<unknown>", null); + } else { + state = CommitState.Committed; + super.commitOrThrow(tx); + } + } + + @Override + public Transaction startLong() { + state = CommitState.Started; + return super.startLong(); + } + + @Override + public Transaction startShort() { + state = CommitState.Started; + return super.startShort(); + } + + @Override + public Transaction startShort(int timeout) { + state = CommitState.Started; + return super.startShort(timeout); + } + + @Override + public void abort(Transaction tx) { + state = CommitState.Aborted; + super.abort(tx); + } + + @Override + public boolean invalidate(long tx) { + state = CommitState.Invalidated; + return super.invalidate(tx); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java index 5f4675b..fcf793e 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java @@ -18,16 +18,12 @@ package org.apache.tephra; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.inject.AbstractModule; import com.google.inject.Guice; -import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Singleton; import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; -import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.runtime.ConfigModule; import org.apache.tephra.runtime.DiscoveryModules; import org.apache.tephra.runtime.TransactionModules; @@ -40,8 +36,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; -import java.util.Collection; -import java.util.List; /** * Tests the transaction executor. @@ -74,10 +68,10 @@ public class TransactionContextTest { txClient = (DummyTxClient) injector.getInstance(TransactionSystemClient.class); } - final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); + private final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); - static final byte[] A = { 'a' }; - static final byte[] B = { 'b' }; + private static final byte[] A = { 'a' }; + private static final byte[] B = { 'b' }; private static TransactionContext newTransactionContext(TransactionAware... txAwares) { return new TransactionContext(txClient, txAwares); @@ -115,7 +109,7 @@ public class TransactionContextTest { @Test public void testPostCommitFailure() throws TransactionFailureException, InterruptedException { - ds1.failPostCommitTxOnce = InduceFailure.ThrowException; + ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction context.start(); @@ -145,7 +139,7 @@ public class TransactionContextTest { @Test public void testPersistFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction context.start(); @@ -175,7 +169,7 @@ public class TransactionContextTest { @Test public void testPersistFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction context.start(); @@ -205,8 +199,8 @@ public class TransactionContextTest { @Test public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction context.start(); @@ -236,8 +230,8 @@ public class TransactionContextTest { @Test public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; - ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction context.start(); @@ -327,8 +321,8 @@ public class TransactionContextTest { @Test public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failChangesTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; + ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction context.start(); @@ -358,7 +352,7 @@ public class TransactionContextTest { @Test public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failStartTxOnce = InduceFailure.ThrowException; + ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(ds1, ds2); // start transaction try { @@ -410,7 +404,7 @@ public class TransactionContextTest { @Test public void testAddThenFailure() throws TransactionFailureException, InterruptedException { - ds2.failCommitTxOnce = InduceFailure.ThrowException; + ds2.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(ds1); // start transaction @@ -482,7 +476,7 @@ public class TransactionContextTest { @Test public void testAndThenRemoveOnFailure() throws TransactionFailureException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; TransactionContext context = newTransactionContext(); context.start(); @@ -507,175 +501,4 @@ public class TransactionContextTest { Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); } - - enum InduceFailure { NoFailure, ReturnFalse, ThrowException } - - static class DummyTxAware implements TransactionAware { - - Transaction tx; - boolean started = false; - boolean committed = false; - boolean checked = false; - boolean rolledBack = false; - boolean postCommitted = false; - List<byte[]> changes = Lists.newArrayList(); - - InduceFailure failStartTxOnce = InduceFailure.NoFailure; - InduceFailure failChangesTxOnce = InduceFailure.NoFailure; - InduceFailure failCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; - - void addChange(byte[] key) { - changes.add(key); - } - - void reset() { - tx = null; - started = false; - checked = false; - committed = false; - rolledBack = false; - postCommitted = false; - changes.clear(); - } - - @Override - public void startTx(Transaction tx) { - reset(); - started = true; - this.tx = tx; - if (failStartTxOnce == InduceFailure.ThrowException) { - failStartTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("start failure"); - } - } - - @Override - public void updateTx(Transaction tx) { - this.tx = tx; - } - - @Override - public Collection<byte[]> getTxChanges() { - checked = true; - if (failChangesTxOnce == InduceFailure.ThrowException) { - failChangesTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("changes failure"); - } - return ImmutableList.copyOf(changes); - } - - @Override - public boolean commitTx() throws Exception { - committed = true; - if (failCommitTxOnce == InduceFailure.ThrowException) { - failCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("persist failure"); - } - if (failCommitTxOnce == InduceFailure.ReturnFalse) { - failCommitTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public void postTxCommit() { - postCommitted = true; - if (failPostCommitTxOnce == InduceFailure.ThrowException) { - failPostCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("post failure"); - } - } - - @Override - public boolean rollbackTx() throws Exception { - rolledBack = true; - if (failRollbackTxOnce == InduceFailure.ThrowException) { - failRollbackTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("rollback failure"); - } - if (failRollbackTxOnce == InduceFailure.ReturnFalse) { - failRollbackTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public String getTransactionAwareName() { - return "dummy"; - } - } - - static class DummyTxClient extends InMemoryTxSystemClient { - - boolean failCanCommitOnce = false; - int failCommits = 0; - enum CommitState { - Started, Committed, Aborted, Invalidated - } - CommitState state = CommitState.Started; - - @Inject - DummyTxClient(TransactionManager txmgr) { - super(txmgr); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { - if (failCanCommitOnce) { - failCanCommitOnce = false; - return false; - } else { - return super.canCommit(tx, changeIds); - } - } - - @Override - public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException { - return canCommit(tx, changeIds); - } - - @Override - public boolean commit(Transaction tx) throws TransactionNotInProgressException { - if (failCommits-- > 0) { - return false; - } else { - state = CommitState.Committed; - return super.commit(tx); - } - } - - @Override - public Transaction startLong() { - state = CommitState.Started; - return super.startLong(); - } - - @Override - public Transaction startShort() { - state = CommitState.Started; - return super.startShort(); - } - - @Override - public Transaction startShort(int timeout) { - state = CommitState.Started; - return super.startShort(timeout); - } - - @Override - public void abort(Transaction tx) { - state = CommitState.Aborted; - super.abort(tx); - } - - @Override - public boolean invalidate(long tx) { - state = CommitState.Invalidated; - return super.invalidate(tx); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java index 676774c..506ffd9 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java @@ -19,19 +19,16 @@ package org.apache.tephra; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.inject.AbstractModule; import com.google.inject.Guice; -import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Singleton; import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; -import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.runtime.ConfigModule; import org.apache.tephra.runtime.DiscoveryModules; import org.apache.tephra.runtime.TransactionModules; -import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecV4; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -41,7 +38,6 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.util.Collection; -import java.util.List; import javax.annotation.Nullable; /** @@ -57,7 +53,7 @@ public class TransactionExecutorTest { @BeforeClass public static void setup() throws IOException { final Configuration conf = new Configuration(); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); Injector injector = Guice.createInjector( new ConfigModule(conf), @@ -77,8 +73,8 @@ public class TransactionExecutorTest { factory = injector.getInstance(TransactionExecutorFactory.class); } - final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); - final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2); + private final DummyTxAware ds1 = new DummyTxAware(), ds2 = new DummyTxAware(); + private final Collection<TransactionAware> txAwares = ImmutableList.<TransactionAware>of(ds1, ds2); private TransactionExecutor getExecutor() { return factory.createExecutor(txAwares); @@ -88,10 +84,10 @@ public class TransactionExecutorTest { return new DefaultTransactionExecutor(txClient, txAwares, RetryStrategies.noRetries()); } - static final byte[] A = { 'a' }; - static final byte[] B = { 'b' }; + private static final byte[] A = { 'a' }; + private static final byte[] B = { 'b' }; - final TransactionExecutor.Function<Integer, Integer> testFunction = + private final TransactionExecutor.Function<Integer, Integer> testFunction = new TransactionExecutor.Function<Integer, Integer>() { @Override public Integer apply(@Nullable Integer input) { @@ -131,7 +127,7 @@ public class TransactionExecutorTest { @Test public void testPostCommitFailure() throws TransactionFailureException, InterruptedException { - ds1.failPostCommitTxOnce = InduceFailure.ThrowException; + ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -155,7 +151,7 @@ public class TransactionExecutorTest { @Test public void testPersistFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -179,7 +175,7 @@ public class TransactionExecutorTest { @Test public void testPersistFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -203,8 +199,8 @@ public class TransactionExecutorTest { @Test public void testPersistAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -228,8 +224,8 @@ public class TransactionExecutorTest { @Test public void testPersistAndRollbackFalse() throws TransactionFailureException, InterruptedException { - ds1.failCommitTxOnce = InduceFailure.ReturnFalse; - ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; + ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ReturnFalse; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -351,8 +347,8 @@ public class TransactionExecutorTest { @Test public void testChangesAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failChangesTxOnce = InduceFailure.ThrowException; - ds1.failRollbackTxOnce = InduceFailure.ThrowException; + ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -376,7 +372,7 @@ public class TransactionExecutorTest { @Test public void testFunctionAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failRollbackTxOnce = InduceFailure.ReturnFalse; + ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ReturnFalse; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, null); @@ -400,7 +396,7 @@ public class TransactionExecutorTest { @Test public void testStartAndRollbackFailure() throws TransactionFailureException, InterruptedException { - ds1.failStartTxOnce = InduceFailure.ThrowException; + ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException; // execute: add a change to ds1 and ds2 try { getExecutor().execute(testFunction, 10); @@ -421,175 +417,4 @@ public class TransactionExecutorTest { Assert.assertFalse(ds2.rolledBack); Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted); } - - enum InduceFailure { NoFailure, ReturnFalse, ThrowException } - - static class DummyTxAware implements TransactionAware { - - Transaction tx; - boolean started = false; - boolean committed = false; - boolean checked = false; - boolean rolledBack = false; - boolean postCommitted = false; - List<byte[]> changes = Lists.newArrayList(); - - InduceFailure failStartTxOnce = InduceFailure.NoFailure; - InduceFailure failChangesTxOnce = InduceFailure.NoFailure; - InduceFailure failCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure; - InduceFailure failRollbackTxOnce = InduceFailure.NoFailure; - - void addChange(byte[] key) { - changes.add(key); - } - - void reset() { - tx = null; - started = false; - checked = false; - committed = false; - rolledBack = false; - postCommitted = false; - changes.clear(); - } - - @Override - public void startTx(Transaction tx) { - reset(); - started = true; - this.tx = tx; - if (failStartTxOnce == InduceFailure.ThrowException) { - failStartTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("start failure"); - } - } - - @Override - public void updateTx(Transaction tx) { - this.tx = tx; - } - - @Override - public Collection<byte[]> getTxChanges() { - checked = true; - if (failChangesTxOnce == InduceFailure.ThrowException) { - failChangesTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("changes failure"); - } - return ImmutableList.copyOf(changes); - } - - @Override - public boolean commitTx() throws Exception { - committed = true; - if (failCommitTxOnce == InduceFailure.ThrowException) { - failCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("persist failure"); - } - if (failCommitTxOnce == InduceFailure.ReturnFalse) { - failCommitTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public void postTxCommit() { - postCommitted = true; - if (failPostCommitTxOnce == InduceFailure.ThrowException) { - failPostCommitTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("post failure"); - } - } - - @Override - public boolean rollbackTx() throws Exception { - rolledBack = true; - if (failRollbackTxOnce == InduceFailure.ThrowException) { - failRollbackTxOnce = InduceFailure.NoFailure; - throw new RuntimeException("rollback failure"); - } - if (failRollbackTxOnce == InduceFailure.ReturnFalse) { - failRollbackTxOnce = InduceFailure.NoFailure; - return false; - } - return true; - } - - @Override - public String getTransactionAwareName() { - return "dummy"; - } - } - - static class DummyTxClient extends InMemoryTxSystemClient { - - boolean failCanCommitOnce = false; - int failCommits = 0; - enum CommitState { - Started, Committed, Aborted, Invalidated - } - CommitState state = CommitState.Started; - - @Inject - DummyTxClient(TransactionManager txmgr) { - super(txmgr); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { - if (failCanCommitOnce) { - failCanCommitOnce = false; - return false; - } else { - return super.canCommit(tx, changeIds); - } - } - - @Override - public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException { - return canCommit(tx, changeIds); - } - - @Override - public boolean commit(Transaction tx) throws TransactionNotInProgressException { - if (failCommits-- > 0) { - return false; - } else { - state = CommitState.Committed; - return super.commit(tx); - } - } - - @Override - public Transaction startLong() { - state = CommitState.Started; - return super.startLong(); - } - - @Override - public Transaction startShort() { - state = CommitState.Started; - return super.startShort(); - } - - @Override - public Transaction startShort(int timeout) { - state = CommitState.Started; - return super.startShort(timeout); - } - - @Override - public void abort(Transaction tx) { - state = CommitState.Aborted; - super.abort(tx); - } - - @Override - public boolean invalidate(long tx) { - state = CommitState.Invalidated; - return super.invalidate(tx); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java index 819a981..b16d93d 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java @@ -31,12 +31,14 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * */ +@SuppressWarnings("WeakerAccess") public class TransactionManagerTest extends TransactionSystemTest { private static Configuration conf; @@ -76,6 +78,92 @@ public class TransactionManagerTest extends TransactionSystemTest { } @Test + public void testCheckpointing() throws TransactionFailureException { + // create a few transactions + Transaction tx1 = txManager.startShort(); + Transaction tx2 = txManager.startShort(); + Transaction tx3 = txManager.startShort(); + + // start and commit a few + for (int i = 0; i < 5; i++) { + Transaction tx = txManager.startShort(); + txManager.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i })); + txManager.commit(tx.getTransactionId(), tx.getWritePointer()); + } + + // checkpoint the transactions + Transaction tx3c = txManager.checkpoint(tx3); + Transaction tx2c = txManager.checkpoint(tx2); + Transaction tx1c = txManager.checkpoint(tx1); + + // start and commit a few (this moves the read pointer past all checkpoint write versions) + for (int i = 5; i < 10; i++) { + Transaction tx = txManager.startShort(); + txManager.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i })); + txManager.commit(tx.getTransactionId(), tx.getWritePointer()); + } + + // start new tx and validate all write pointers are excluded + Transaction tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx1.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx1c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + txManager.abort(tx); + + // abort one of the checkpoints + txManager.abort(tx1c); + + // start new tx and validate all write pointers are excluded + tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + txManager.abort(tx); + + // invalidate one of the checkpoints + txManager.invalidate(tx2c.getTransactionId()); + + // start new tx and validate all write pointers are excluded + tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + txManager.abort(tx); + + // commit the last checkpoint + txManager.canCommit(tx3.getTransactionId(), Collections.<byte[]>emptyList()); + txManager.commit(tx3c.getTransactionId(), tx3c.getWritePointer()); + + // start new tx and validate all write pointers are excluded + tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + txManager.abort(tx); + } + + private void validateSorted(long[] array) { + Long lastSeen = null; + for (long value : array) { + Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)), + lastSeen == null || lastSeen < value); + lastSeen = value; + } + } + + @Test public void testTransactionCleanup() throws Exception { Configuration config = new Configuration(conf); config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); @@ -98,8 +186,8 @@ public class TransactionManagerTest extends TransactionSystemTest { // start and commit a bunch of transactions for (int i = 0; i < 10; i++) { Transaction tx = txm.startShort(); - Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(txm.commit(tx)); + txm.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i })); + txm.commit(tx.getTransactionId(), tx.getWritePointer()); } // all of these should still be in the committed set Assert.assertEquals(0, txm.getInvalidSize()); @@ -124,14 +212,14 @@ public class TransactionManagerTest extends TransactionSystemTest { tx2.getWritePointer(), tx2c.getWritePointer()}, txx.getInvalids()); // try to commit the last transaction that was started - Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a }))); - Assert.assertTrue(txm.commit(txx)); + txm.canCommit(txx.getTransactionId(), Collections.singleton(new byte[] { 0x0a })); + txm.commit(txx.getTransactionId(), txx.getWritePointer()); // now the committed change sets should be empty again Assert.assertEquals(0, txm.getCommittedSize()); // cannot commit transaction as it was timed out try { - txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 })); + txm.canCommit(tx1.getTransactionId(), Collections.singleton(new byte[] { 0x11 })); Assert.fail(); } catch (TransactionNotInProgressException e) { // expected @@ -148,14 +236,14 @@ public class TransactionManagerTest extends TransactionSystemTest { // run another bunch of transactions for (int i = 0; i < 10; i++) { Transaction tx = txm.startShort(); - Assert.assertTrue(txm.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(txm.commit(tx)); + txm.canCommit(tx.getTransactionId(), Collections.singleton(new byte[] { (byte) i })); + txm.commit(tx.getTransactionId(), tx.getWritePointer()); } // none of these should still be in the committed set (tx2 is long-running). Assert.assertEquals(0, txm.getInvalidSize()); Assert.assertEquals(0, txm.getCommittedSize()); // commit tx2, abort tx3 - Assert.assertTrue(txm.commit(ltx1)); + txm.commit(ltx1.getTransactionId(), ltx1.getWritePointer()); txm.abort(ltx2); // none of these should still be in the committed set (tx2 is long-running). // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes @@ -196,7 +284,7 @@ public class TransactionManagerTest extends TransactionSystemTest { // cannot commit transaction as it was timed out try { - txm.canCommit(tx1, Collections.singleton(new byte[] { 0x11 })); + txm.canCommit(tx1.getTransactionId(), Collections.singleton(new byte[] { 0x11 })); Assert.fail(); } catch (TransactionNotInProgressException e) { // expected http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java index 5448052..6cbda2f 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java @@ -90,7 +90,7 @@ public abstract class TransactionSystemTest { } Transaction tx = client.startShort(); client.canCommitOrThrow(tx, fiftyChanges); - client.commit(tx); + client.commitOrThrow(tx); // now try another transaction with 51 changes fiftyChanges.add(new byte[] { 50 }); @@ -112,7 +112,7 @@ public abstract class TransactionSystemTest { } tx = client.startShort(); client.canCommitOrThrow(tx, changes2k); - client.commit(tx); + client.commitOrThrow(tx); // now add another byte to the change set to exceed the limit changes2k.add(new byte[] { 0 }); @@ -134,14 +134,14 @@ public abstract class TransactionSystemTest { Transaction tx1 = client1.startShort(); Transaction tx2 = client2.startShort(); - Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2))); + client1.canCommitOrThrow(tx1, asList(C1, C2)); // second one also can commit even thought there are conflicts with first since first one hasn't committed yet - Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3))); + client2.canCommitOrThrow(tx2, asList(C2, C3)); - Assert.assertTrue(client1.commit(tx1)); + client1.commitOrThrow(tx1); // now second one should not commit, since there are conflicts with tx1 that has been committed - Assert.assertFalse(client2.commit(tx2)); + assertCommitConflicts(client2, tx2); } @Test @@ -161,16 +161,16 @@ public abstract class TransactionSystemTest { Transaction tx4 = client4.startShort(); Transaction tx5 = client5.startShort(); - Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1))); - Assert.assertTrue(client1.commit(tx1)); + client1.canCommitOrThrow(tx1, asList(C1)); + client1.commitOrThrow(tx1); - Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2))); - Assert.assertTrue(client2.commit(tx2)); + client2.canCommitOrThrow(tx2, asList(C2)); + client2.commitOrThrow(tx2); // verifying conflicts detection - Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1))); - Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2))); - Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3))); + assertCanCommitConflicts(client3, tx3, asList(C1)); + assertCanCommitConflicts(client4, tx4, asList(C2)); + client5.canCommitOrThrow(tx5, asList(C3)); } @Test @@ -178,15 +178,10 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); - Assert.assertTrue(client.commit(tx)); + client.canCommitOrThrow(tx, asList(C1, C2)); + client.commitOrThrow(tx); // cannot commit twice same tx - try { - Assert.assertFalse(client.commit(tx)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } + assertCommitNotInProgress(client, tx); } @Test @@ -194,7 +189,7 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); + client.canCommitOrThrow(tx, asList(C1, C2)); client.abort(tx); // abort of not active tx has no affect client.abort(tx); @@ -205,21 +200,12 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); - Assert.assertTrue(client.commit(tx)); + client.canCommitOrThrow(tx, asList(C1, C2)); + client.commitOrThrow(tx); + // can't re-use same tx again - try { - client.canCommitOrThrow(tx, asList(C3, C4)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - try { - Assert.assertFalse(client.commit(tx)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } + assertCanCommitNotInProgress(client, tx, asList(C3, C4)); + assertCommitNotInProgress(client, tx); // abort of not active tx has no affect client.abort(tx); @@ -229,24 +215,15 @@ public abstract class TransactionSystemTest { public void testUseNotStarted() throws Exception { TransactionSystemClient client = getClient(); Transaction tx1 = client.startShort(); - Assert.assertTrue(client.commit(tx1)); + client.commitOrThrow(tx1); // we know this is one is older than current writePointer and was not used Transaction txOld = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() - 1, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); - try { - Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4))); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - try { - Assert.assertFalse(client.commit(txOld)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } + assertCanCommitNotInProgress(client, txOld, asList(C3, C4)); + assertCommitNotInProgress(client, txOld); + // abort of not active tx has no affect client.abort(txOld); @@ -254,18 +231,9 @@ public abstract class TransactionSystemTest { Transaction txNew = new Transaction(tx1.getReadPointer(), tx1.getTransactionId() + 1, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); - try { - Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4))); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } - try { - Assert.assertFalse(client.commit(txNew)); - Assert.fail(); - } catch (TransactionNotInProgressException e) { - // expected - } + assertCanCommitNotInProgress(client, txNew, asList(C3, C4)); + assertCommitNotInProgress(client, txNew); + // abort of not active tx has no affect client.abort(txNew); } @@ -275,8 +243,9 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); - Assert.assertTrue(client.commit(tx)); + client.canCommitOrThrow(tx, asList(C1, C2)); + client.commitOrThrow(tx); + // abort of not active tx has no affect client.abort(tx); } @@ -292,7 +261,7 @@ public abstract class TransactionSystemTest { // Cannot invalidate a committed tx Transaction tx2 = client.startShort(); client.canCommitOrThrow(tx2, asList(C3, C4)); - client.commit(tx2); + client.commitOrThrow(tx2); Assert.assertFalse(client.invalidate(tx2.getTransactionId())); } @@ -306,7 +275,7 @@ public abstract class TransactionSystemTest { Transaction tx1 = client.startShort(); Transaction tx2 = client.startShort(); client.canCommitOrThrow(tx1, asList(C1, C2)); - client.commit(tx1); + client.commitOrThrow(tx1); client.canCommitOrThrow(tx2, asList(C3, C4)); Transaction txPreReset = client.startShort(); @@ -409,8 +378,8 @@ public abstract class TransactionSystemTest { // start and commit a few for (int i = 0; i < 5; i++) { Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(client.commit(tx)); + client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i })); + client.commitOrThrow(tx); } // checkpoint the transactions @@ -421,8 +390,8 @@ public abstract class TransactionSystemTest { // start and commit a few (this moves the read pointer past all checkpoint write versions) for (int i = 5; i < 10; i++) { Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(client.commit(tx)); + client.canCommitOrThrow(tx, Collections.singleton(new byte[] { (byte) i })); + client.commitOrThrow(tx); } // start new tx and validate all write pointers are excluded @@ -464,8 +433,8 @@ public abstract class TransactionSystemTest { client.abort(tx); // commit the last checkpoint - Assert.assertTrue(client.canCommit(tx3, Collections.<byte[]>emptyList())); - Assert.assertTrue(client.commit(tx3c)); + client.canCommitOrThrow(tx3, Collections.<byte[]>emptyList()); + client.commitOrThrow(tx3c); // start new tx and validate all write pointers are excluded tx = client.startShort(); @@ -485,6 +454,46 @@ public abstract class TransactionSystemTest { } } + private void assertCommitConflicts(TransactionSystemClient client, Transaction tx) + throws TransactionFailureException { + try { + client.commitOrThrow(tx); + Assert.fail(); + } catch (TransactionConflictException e) { + //expected + } + } + + private void assertCanCommitConflicts(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes) + throws TransactionFailureException { + try { + client.canCommitOrThrow(tx, changes); + Assert.fail(); + } catch (TransactionConflictException e) { + //expected + } + } + + private void assertCommitNotInProgress(TransactionSystemClient client, Transaction tx) + throws TransactionFailureException { + try { + client.commitOrThrow(tx); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + //expected + } + } + + private void assertCanCommitNotInProgress(TransactionSystemClient client, Transaction tx, Collection<byte[]> changes) + throws TransactionFailureException { + try { + client.canCommitOrThrow(tx, changes); + Assert.fail(); + } catch (TransactionNotInProgressException e) { + //expected + } + } + private Collection<byte[]> asList(byte[]... val) { return Arrays.asList(val); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java index 6075452..bd48f7a 100644 --- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java @@ -197,7 +197,7 @@ public class ThriftTransactionServerTest { // simply start + commit transaction TransactionSystemClient txClient = getClient(); Transaction tx = txClient.startShort(); - txClient.commit(tx); + txClient.commitOrThrow(tx); // Expire zookeeper session, which causes Thrift server to stop running. expireZkSession(zkClientService); @@ -215,7 +215,7 @@ public class ThriftTransactionServerTest { txClient = getClient(); // verify that we can start and commit a transaction after becoming leader again tx = txClient.startShort(); - txClient.commit(tx); + txClient.commitOrThrow(tx); } private void expireZkSession(ZKClientService zkClientService) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java index 971c93c..961d368 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java @@ -28,6 +28,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; @@ -142,11 +143,11 @@ public abstract class AbstractTransactionStateStorageTest { txManager.invalidate(invalid.getTransactionId()); // start a tx1, add a change A and commit Transaction tx1 = txManager.startShort("client1"); - Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); - Assert.assertTrue(txManager.commit(tx1)); + txManager.canCommit(tx1.getTransactionId(), Collections.singleton(a)); + txManager.commit(tx1.getTransactionId(), tx1.getWritePointer()); // start a tx2 and add a change B Transaction tx2 = txManager.startShort("client2"); - Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); + txManager.canCommit(tx2.getTransactionId(), Collections.singleton(b)); // start a tx3 Transaction tx3 = txManager.startShort("client3"); // restart @@ -172,7 +173,7 @@ public abstract class AbstractTransactionStateStorageTest { txManager.abort(invalid); // commit tx2 - Assert.assertTrue(txManager.commit(tx2)); + txManager.commit(tx2.getTransactionId(), tx2.getWritePointer()); // start another transaction, must be greater than tx3 Transaction tx4 = txManager.startShort(); Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId()); @@ -181,7 +182,12 @@ public abstract class AbstractTransactionStateStorageTest { Assert.assertFalse(tx2.isVisible(tx3.getTransactionId())); Assert.assertFalse(tx2.isVisible(tx4.getTransactionId())); // add same change for tx3 - Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b))); + try { + txManager.canCommit(tx3.getTransactionId(), Collections.singleton(b)); + Assert.fail("canCommit() should have failed"); + } catch (TransactionConflictException e) { + // expected + } // check visibility with new xaction Transaction tx5 = txManager.startShort(); Assert.assertTrue(tx5.isVisible(tx1.getTransactionId())); @@ -252,11 +258,11 @@ public abstract class AbstractTransactionStateStorageTest { final byte[] b = { 'b' }; // start a tx1, add a change A and commit Transaction tx1 = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); - Assert.assertTrue(txManager.commit(tx1)); + txManager.canCommit(tx1.getTransactionId(), Collections.singleton(a)); + txManager.commit(tx1.getTransactionId(), tx1.getWritePointer()); // start a tx2 and add a change B Transaction tx2 = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); + txManager.canCommit(tx2.getTransactionId(), Collections.singleton(b)); // start a tx3 Transaction tx3 = txManager.startShort(); TransactionSnapshot origState = txManager.getCurrentState(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java index 9c565ba..98d1148 100644 --- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java @@ -364,8 +364,8 @@ public class SnapshotCodecTest { Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty()); // Should be able to commit the transaction - Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList())); - Assert.assertTrue(txManager.commit(checkpointTx)); + txManager.canCommit(checkpointTx.getTransactionId(), Collections.<byte[]>emptyList()); + txManager.commit(checkpointTx.getTransactionId(), checkpointTx.getWritePointer()); // save a new snapshot txManager.stopAndWait(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 3c7d1e2..9615d8e 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; @@ -89,7 +88,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -151,7 +149,7 @@ public class TransactionProcessorTest { // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), - new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); + new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), txSnapshot.getInProgress()); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index d826bad..4d34ed9 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -37,7 +37,6 @@ import java.util.Map; import javax.annotation.Nullable; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * HBase 0.96 specific test for filtering logic applied when reading data transactionally. @@ -250,20 +249,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili */ Transaction tx1 = txManager.startShort(); - assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx1)); + txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx1.getTransactionId(), tx1.getWritePointer()); Transaction tx2 = txManager.startShort(); - assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx2)); + txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx2.getTransactionId(), tx2.getWritePointer()); Transaction tx3 = txManager.startShort(); Transaction tx4 = txManager.startShort(); txManager.invalidate(tx4.getTransactionId()); Transaction tx5 = txManager.startShort(); - assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx5)); + txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx5.getTransactionId(), tx5.getWritePointer()); Transaction tx6 = txManager.startShort(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index b8e051b..dcb8314 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; @@ -95,7 +94,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -156,7 +154,7 @@ public class TransactionProcessorTest { // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), - new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); + new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), txSnapshot.getInProgress()); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 7a57aac..3352eef 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -37,7 +37,6 @@ import java.util.Map; import javax.annotation.Nullable; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * HBase 0.98 specific test for filtering logic applied when reading data transactionally. @@ -249,20 +248,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili */ Transaction tx1 = txManager.startShort(); - assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx1)); + txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx1.getTransactionId(), tx1.getWritePointer()); Transaction tx2 = txManager.startShort(); - assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx2)); + txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx2.getTransactionId(), tx2.getWritePointer()); Transaction tx3 = txManager.startShort(); Transaction tx4 = txManager.startShort(); txManager.invalidate(tx4.getTransactionId()); Transaction tx5 = txManager.startShort(); - assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx5)); + txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx5.getTransactionId(), tx5.getWritePointer()); Transaction tx6 = txManager.startShort(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 9ce30b5..016adbd 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; @@ -77,7 +76,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -136,7 +134,7 @@ public class TransactionProcessorTest { // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), - new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); + new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), txSnapshot.getInProgress()); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 5b9802d..c27a10d 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -37,7 +37,6 @@ import java.util.Map; import javax.annotation.Nullable; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * HBase 1.0 (CDH) specific test for filtering logic applied when reading data transactionally. @@ -249,20 +248,20 @@ public class TransactionVisibilityFilterTest extends AbstractTransactionVisibili */ Transaction tx1 = txManager.startShort(); - assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx1)); + txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx1.getTransactionId(), tx1.getWritePointer()); Transaction tx2 = txManager.startShort(); - assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx2)); + txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx2.getTransactionId(), tx2.getWritePointer()); Transaction tx3 = txManager.startShort(); Transaction tx4 = txManager.startShort(); txManager.invalidate(tx4.getTransactionId()); Transaction tx5 = txManager.startShort(); - assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET)); - assertTrue(txManager.commit(tx5)); + txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET); + txManager.commit(tx5.getTransactionId(), tx5.getWritePointer()); Transaction tx6 = txManager.startShort(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 0ec3b46..5328aef 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; @@ -77,7 +76,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -136,7 +134,7 @@ public class TransactionProcessorTest { // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), - new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); + new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), txSnapshot.getInProgress());
