(TEPHRA-240) Include conflicting key and client id in TransactionConflictException
This closes #47 from GitHub. Signed-off-by: anew <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/174c3325 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/174c3325 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/174c3325 Branch: refs/heads/master Commit: 174c3325366e33ea7979f2ecfa621b68d77b8a35 Parents: 810c9dd Author: anew <[email protected]> Authored: Sun Sep 10 21:33:53 2017 -0700 Committer: anew <[email protected]> Committed: Tue Sep 12 16:22:58 2017 -0700 ---------------------------------------------------------------------- .../tephra/TransactionConflictException.java | 39 + .../org/apache/tephra/TransactionContext.java | 32 +- .../org/apache/tephra/TransactionManager.java | 187 +- .../apache/tephra/TransactionSystemClient.java | 31 +- .../java/org/apache/tephra/TxConstants.java | 11 + .../distributed/TransactionServiceClient.java | 74 +- .../TransactionServiceThriftClient.java | 42 +- .../TransactionServiceThriftHandler.java | 39 +- .../thrift/TTransactionConflictException.java | 602 ++++++ .../distributed/thrift/TTransactionServer.java | 1818 ++++++++++++++---- .../tephra/inmemory/DetachedTxSystemClient.java | 15 +- .../tephra/inmemory/InMemoryTxSystemClient.java | 27 +- .../tephra/inmemory/MinimalTxSystemClient.java | 9 +- .../tephra/persist/TransactionSnapshot.java | 12 +- tephra-core/src/main/thrift/transaction.thrift | 26 +- .../java/org/apache/tephra/ClientIdTest.java | 114 ++ .../java/org/apache/tephra/DummyTxAware.java | 123 ++ .../java/org/apache/tephra/DummyTxClient.java | 91 + .../apache/tephra/TransactionContextTest.java | 207 +- .../apache/tephra/TransactionExecutorTest.java | 211 +- .../apache/tephra/TransactionManagerTest.java | 106 +- .../apache/tephra/TransactionSystemTest.java | 151 +- .../ThriftTransactionServerTest.java | 4 +- .../AbstractTransactionStateStorageTest.java | 22 +- .../tephra/snapshot/SnapshotCodecTest.java | 4 +- .../coprocessor/TransactionProcessorTest.java | 4 +- .../TransactionVisibilityFilterTest.java | 13 +- .../coprocessor/TransactionProcessorTest.java | 4 +- .../TransactionVisibilityFilterTest.java | 13 +- .../coprocessor/TransactionProcessorTest.java | 4 +- .../TransactionVisibilityFilterTest.java | 13 +- .../coprocessor/TransactionProcessorTest.java | 4 +- .../TransactionVisibilityFilterTest.java | 13 +- .../coprocessor/TransactionProcessorTest.java | 10 +- .../TransactionVisibilityFilterTest.java | 13 +- .../coprocessor/TransactionProcessorTest.java | 4 +- .../TransactionVisibilityFilterTest.java | 13 +- 37 files changed, 2997 insertions(+), 1108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java ---------------------------------------------------------------------- diff --git a/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java b/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java index d07ed04..d3bd180 100644 --- a/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java +++ b/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java @@ -22,11 +22,50 @@ package org.apache.tephra; * Thrown to indicate transaction conflict occurred when trying to commit a transaction. */ public class TransactionConflictException extends TransactionFailureException { + + private final Long transactionId; + private final String conflictingKey; + private final String conflictingClient; + + /** + * @deprecated since 0.13-incubating. Use {@link #TransactionConflictException(long, String, String)} instead. + */ + @Deprecated public TransactionConflictException(String message) { super(message); + transactionId = null; + conflictingKey = null; + conflictingClient = null; } + /** + * @deprecated since 0.13-incubating. Use {@link #TransactionConflictException(long, String, String)} instead. + */ + @Deprecated public TransactionConflictException(String message, Throwable cause) { super(message, cause); + transactionId = null; + conflictingKey = null; + conflictingClient = null; + } + + public TransactionConflictException(long transactionId, String conflictingKey, String conflictingClient) { + super(String.format("Transaction %d conflicts with %s on change key '%s'", transactionId, + conflictingClient == null ? "unknown client" : conflictingClient, conflictingKey)); + this.transactionId = transactionId; + this.conflictingKey = conflictingKey; + this.conflictingClient = conflictingClient; + } + + public Long getTransactionId() { + return transactionId; + } + + public String getConflictingKey() { + return conflictingKey; + } + + public String getConflictingClient() { + return conflictingClient; } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java index 8b4e4fd..3c11e96 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java @@ -267,23 +267,16 @@ public class TransactionContext { // abort will throw that exception } } - - boolean canCommit = false; try { - canCommit = txClient.canCommitOrThrow(currentTx, changes); - } catch (TransactionNotInProgressException | TransactionSizeException e) { - throw e; - // abort will throw that exception + txClient.canCommitOrThrow(currentTx, changes); + } catch (TransactionFailureException e) { + abort(e); + // abort will rethrow this exception } catch (Throwable e) { String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId()); abort(new TransactionFailureException(message, e)); // abort will throw that exception } - if (!canCommit) { - String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId()); - abort(new TransactionConflictException(message)); - // abort will throw - } } private void persist() throws TransactionFailureException { @@ -311,25 +304,16 @@ public class TransactionContext { } private void commit() throws TransactionFailureException { - boolean commitSuccess = false; try { - commitSuccess = txClient.commit(currentTx); - } catch (TransactionNotInProgressException e) { - String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception + txClient.commitOrThrow(currentTx); + } catch (TransactionFailureException e) { + abort(e); + // abort will rethrow this exception } catch (Throwable e) { String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId()); - LOG.warn(message, e); abort(new TransactionFailureException(message, e)); // abort will throw that exception } - if (!commitSuccess) { - String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId()); - abort(new TransactionConflictException(message)); - // abort will throw - } } private void postCommit() throws TransactionFailureException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index 4479812..68450c9 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -130,9 +130,9 @@ public class TransactionManager extends AbstractService { // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil) // todo: should this be consolidated with inProgress? // commit time next writePointer -> changes made by this tx - private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = new ConcurrentSkipListMap<>(); + private final NavigableMap<Long, ChangeSet> committedChangeSets = new ConcurrentSkipListMap<>(); // not committed yet - private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap(); + private final Map<Long, ChangeSet> committingChangeSets = Maps.newConcurrentMap(); private long readPointer; private long lastWritePointer; @@ -157,6 +157,10 @@ public class TransactionManager extends AbstractService { private DaemonThreadExecutor snapshotThread; private DaemonThreadExecutor metricsThread; + // retention of client id for transactions - this affects memory footprint + private final boolean retainClientId; + private final boolean retainClientIdPastCommit; + // lock guarding change of the current transaction log private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock(); private final Lock logReadLock = logLock.readLock(); @@ -206,12 +210,19 @@ public class TransactionManager extends AbstractService { // TODO: REMOVE WITH txnBackwardsCompatCheck() longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000); - // + ClientIdRetention retention = ClientIdRetention.valueOf( + conf.get(TxConstants.Manager.CFG_TX_RETAIN_CLIENT_ID, + TxConstants.Manager.DEFAULT_TX_RETAIN_CLIENT_ID).toUpperCase()); + this.retainClientId = retention != ClientIdRetention.OFF; + this.retainClientIdPastCommit = retention == ClientIdRetention.COMMITTED; + this.txMetricsCollector = txMetricsCollector; this.txMetricsCollector.configure(conf); clear(); } + enum ClientIdRetention { OFF, ACTIVE, COMMITTED } + private void clear() { invalidTxList.clear(); inProgress.clear(); @@ -400,7 +411,7 @@ public class TransactionManager extends AbstractService { } public synchronized TransactionSnapshot getSnapshot() throws IOException { - TransactionSnapshot snapshot = null; + TransactionSnapshot snapshot; if (!isRunning() && !isStopping()) { return null; } @@ -436,8 +447,8 @@ public class TransactionManager extends AbstractService { private void doSnapshot(boolean closing) throws IOException { long snapshotTime = 0L; - TransactionSnapshot snapshot = null; - TransactionLog oldLog = null; + TransactionSnapshot snapshot; + TransactionLog oldLog; try { this.logWriteLock.lock(); try { @@ -521,8 +532,12 @@ public class TransactionManager extends AbstractService { lastWritePointer = snapshot.getWritePointer(); invalidTxList.addAll(snapshot.getInvalid()); inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress())); - committingChangeSets.putAll(snapshot.getCommittingChangeSets()); - committedChangeSets.putAll(snapshot.getCommittedChangeSets()); + for (Map.Entry<Long, Set<ChangeId>> entry : snapshot.getCommittingChangeSets().entrySet()) { + committingChangeSets.put(entry.getKey(), new ChangeSet(null, entry.getValue())); + } + for (Map.Entry<Long, Set<ChangeId>> entry : snapshot.getCommittedChangeSets().entrySet()) { + committedChangeSets.put(entry.getKey(), new ChangeSet(null, entry.getValue())); + } } /** @@ -593,7 +608,7 @@ public class TransactionManager extends AbstractService { if (reader == null) { continue; } - TransactionEdit edit = null; + TransactionEdit edit; while ((edit = reader.next()) != null) { editCnt++; switch (edit.getState()) { @@ -613,7 +628,7 @@ public class TransactionManager extends AbstractService { addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null); break; case COMMITTING: - addCommittingChangeSet(edit.getWritePointer(), edit.getChanges()); + addCommittingChangeSet(edit.getWritePointer(), null, edit.getChanges()); break; case COMMITTED: // TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140 @@ -621,7 +636,7 @@ public class TransactionManager extends AbstractService { long[] checkpointPointers = edit.getCheckpointPointers(); long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ? transactionId : checkpointPointers[checkpointPointers.length - 1]; - doCommit(transactionId, writePointer, edit.getChanges(), + doCommit(transactionId, writePointer, new ChangeSet(null, edit.getChanges()), edit.getCommitPointer(), edit.getCanCommit()); break; case INVALID: @@ -670,9 +685,7 @@ public class TransactionManager extends AbstractService { throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState()); } } - } catch (IOException ioe) { - throw Throwables.propagate(ioe); - } catch (InvalidTruncateTimeException e) { + } catch (IOException | InvalidTruncateTimeException e) { throw Throwables.propagate(e); } LOG.info("Read " + editCnt + " edits from log " + log.getName()); @@ -815,7 +828,7 @@ public class TransactionManager extends AbstractService { } private Transaction startTx(long expiration, TransactionType type, @Nullable String clientId) { - Transaction tx = null; + Transaction tx; long txid; // guard against changes to the transaction log while processing this.logReadLock.lock(); @@ -824,7 +837,8 @@ public class TransactionManager extends AbstractService { ensureAvailable(); txid = getNextWritePointer(); tx = createTransaction(txid, type); - addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type, clientId); + addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type, + retainClientId ? clientId : null); } // appending to WAL out of global lock for concurrent performance // we should still be able to arrive at the same state even if log entries are out of order @@ -853,46 +867,42 @@ public class TransactionManager extends AbstractService { } } - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) - throws TransactionNotInProgressException, TransactionSizeException { + public void canCommit(long txId, Collection<byte[]> changeIds) + throws TransactionNotInProgressException, TransactionSizeException, TransactionConflictException { txMetricsCollector.rate("canCommit"); Stopwatch timer = new Stopwatch().start(); - InProgressTx inProgressTx = inProgress.get(tx.getTransactionId()); + InProgressTx inProgressTx = inProgress.get(txId); if (inProgressTx == null) { synchronized (this) { // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. - if (invalidTxList.contains(tx.getTransactionId())) { + if (invalidTxList.contains(txId)) { throw new TransactionNotInProgressException( String.format( - "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", - tx.getTransactionId())); + "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", txId)); } else { throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); + String.format("canCommit() is called for transaction %d that is not in progress", txId)); } } } Set<ChangeId> set = - validateChangeSet(tx, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID); + validateChangeSet(txId, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID); + checkForConflicts(txId, set); - if (hasConflicts(tx, set)) { - return false; - } // guard against changes to the transaction log while processing this.logReadLock.lock(); try { synchronized (this) { ensureAvailable(); - addCommittingChangeSet(tx.getTransactionId(), set); + addCommittingChangeSet(txId, inProgressTx.getClientId(), set); } - appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set)); + appendToLog(TransactionEdit.createCommitting(txId, set)); } finally { this.logReadLock.unlock(); } txMetricsCollector.histogram("canCommit.latency", (int) timer.elapsedMillis()); - return true; } /** @@ -905,19 +915,19 @@ public class TransactionManager extends AbstractService { * @return the same set of changes, transformed into a set of {@link ChangeId}s. * @throws TransactionSizeException if the number or total size of the changes exceed the limit. */ - private Set<ChangeId> validateChangeSet(Transaction tx, Collection<byte[]> changeIds, + private Set<ChangeId> validateChangeSet(long txId, Collection<byte[]> changeIds, String clientId) throws TransactionSizeException { if (changeIds.size() > changeSetCountLimit) { LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries and exceeds " + "the allowed size of {}. Limit the number of changes, or use a long-running transaction. ", - tx.getTransactionId(), clientId, changeIds.size(), changeSetCountLimit); + txId, clientId, changeIds.size(), changeSetCountLimit); throw new TransactionSizeException(String.format( "Change set for transaction %d has %d entries and exceeds the limit of %d", - tx.getTransactionId(), changeIds.size(), changeSetCountLimit)); + txId, changeIds.size(), changeSetCountLimit)); } else if (changeIds.size() > changeSetCountThreshold) { LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries. " + "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ", - tx.getTransactionId(), clientId, changeIds.size(), changeSetCountThreshold); + txId, clientId, changeIds.size(), changeSetCountThreshold); } long byteCount = 0L; Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size()); @@ -928,26 +938,28 @@ public class TransactionManager extends AbstractService { if (byteCount > changeSetSizeLimit) { LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes and exceeds " + "the allowed size of {} bytes. Limit the total size of changes, or use a long-running transaction. ", - tx.getTransactionId(), clientId, byteCount, changeSetSizeLimit); + txId, clientId, byteCount, changeSetSizeLimit); throw new TransactionSizeException(String.format( "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes", - tx.getTransactionId(), byteCount, changeSetSizeLimit)); + txId, byteCount, changeSetSizeLimit)); } else if (byteCount > changeSetSizeThreshold) { LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes. " + "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ", - tx.getTransactionId(), clientId, byteCount, changeSetSizeThreshold); + txId, clientId, byteCount, changeSetSizeThreshold); } return set; } - private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) { - committingChangeSets.put(writePointer, changes); + private void addCommittingChangeSet(long writePointer, String clientId, Set<ChangeId> changes) { + committingChangeSets.put(writePointer, new ChangeSet(retainClientIdPastCommit ? clientId : null, changes)); } - public boolean commit(Transaction tx) throws TransactionNotInProgressException { + public void commit(long txId, long writePointer) + throws TransactionNotInProgressException, TransactionConflictException { + txMetricsCollector.rate("commit"); Stopwatch timer = new Stopwatch().start(); - Set<ChangeId> changeSet = null; + ChangeSet changeSet; boolean addToCommitted = true; long commitPointer; // guard against changes to the transaction log while processing @@ -958,57 +970,55 @@ public class TransactionManager extends AbstractService { // we record commits at the first not-yet assigned transaction id to simplify clearing out change sets that // are no longer visible by any in-progress transactions commitPointer = lastWritePointer + 1; - if (inProgress.get(tx.getTransactionId()) == null) { + if (inProgress.get(txId) == null) { // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. - if (invalidTxList.contains(tx.getTransactionId())) { + if (invalidTxList.contains(txId)) { throw new TransactionNotInProgressException( String.format("canCommit() is called for transaction %d that is not in progress " + - "(it is known to be invalid)", tx.getTransactionId())); + "(it is known to be invalid)", txId)); } else { throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); + String.format("canCommit() is called for transaction %d that is not in progress", txId)); } } // these should be atomic // NOTE: whether we succeed or not we don't need to keep changes in committing state: same tx cannot // be attempted to commit twice - changeSet = committingChangeSets.remove(tx.getTransactionId()); + changeSet = committingChangeSets.remove(txId); if (changeSet != null) { // double-checking if there are conflicts: someone may have committed since canCommit check - if (hasConflicts(tx, changeSet)) { - return false; - } + checkForConflicts(txId, changeSet.getChangeIds()); } else { // no changes addToCommitted = false; } - doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted); + doCommit(txId, writePointer, changeSet, commitPointer, addToCommitted); } - appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted)); + appendToLog(TransactionEdit.createCommitted(txId, changeSet == null ? null : changeSet.getChangeIds(), + commitPointer, addToCommitted)); } finally { this.logReadLock.unlock(); } txMetricsCollector.histogram("commit.latency", (int) timer.elapsedMillis()); - return true; } - private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer, + private void doCommit(long transactionId, long writePointer, ChangeSet changes, long commitPointer, boolean addToCommitted) { // In case this method is called when loading a previous WAL, we need to remove the tx from these sets committingChangeSets.remove(transactionId); - if (addToCommitted && !changes.isEmpty()) { + if (addToCommitted && !changes.getChangeIds().isEmpty()) { // No need to add empty changes to the committed change sets, they will never trigger any conflict // Record the committed change set with the next writePointer as the commit time. // NOTE: we use current next writePointer as key for the map, hence we may have multiple txs changesets to be // stored under one key - Set<ChangeId> changeIds = committedChangeSets.get(commitPointer); - if (changeIds != null) { + ChangeSet committed = committedChangeSets.get(commitPointer); + if (committed != null) { // NOTE: we modify the new set to prevent concurrent modification exception, as other threads (e.g. in // canCommit) use it unguarded - changes.addAll(changeIds); + changes.getChangeIds().addAll(committed.getChangeIds()); } committedChangeSets.put(commitPointer, changes); } @@ -1112,7 +1122,7 @@ public class TransactionManager extends AbstractService { } private boolean doInvalidate(long writePointer) { - Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer); + ChangeSet previousChangeSet = committingChangeSets.remove(writePointer); // remove from in-progress set, so that it does not get excluded in the future InProgressTx previous = inProgress.remove(writePointer); @@ -1224,9 +1234,9 @@ public class TransactionManager extends AbstractService { txMetricsCollector.rate("checkpoint"); Stopwatch timer = new Stopwatch().start(); - Transaction checkpointedTx = null; + Transaction checkpointedTx; long txId = originalTx.getTransactionId(); - long newWritePointer = 0; + long newWritePointer; // guard against changes to the transaction log while processing this.logReadLock.lock(); try { @@ -1284,39 +1294,49 @@ public class TransactionManager extends AbstractService { return this.committedChangeSets.size(); } - private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) { + @Nullable + @VisibleForTesting + InProgressTx getInProgress(long transactionId) { + return inProgress.get(transactionId); + } + + private void checkForConflicts(long txId, Set<ChangeId> changeIds) throws TransactionConflictException { if (changeIds.isEmpty()) { - return false; + return; } - for (Map.Entry<Long, Set<ChangeId>> changeSet : committedChangeSets.entrySet()) { + for (Map.Entry<Long, ChangeSet> committed : committedChangeSets.entrySet()) { // If commit time is greater than tx read-pointer, // basically not visible but committed means "tx committed after given tx was started" - if (changeSet.getKey() > tx.getTransactionId()) { - if (overlap(changeSet.getValue(), changeIds)) { - return true; + if (committed.getKey() > txId) { + ChangeId change = overlap(committed.getValue().getChangeIds(), changeIds); + if (change != null) { + throw new TransactionConflictException(txId, change.toString(), committed.getValue().getClientId()); } } } - return false; } - private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) { + /** + * Checks for overlap in two change sets, returns the first common change it finds, or null if no overlap. + */ + @Nullable + private ChangeId overlap(Set<ChangeId> a, Set<ChangeId> b) { // iterate over the smaller set, and check for every element in the other set if (a.size() > b.size()) { for (ChangeId change : b) { if (a.contains(change)) { - return true; + return change; } } } else { for (ChangeId change : a) { if (b.contains(change)) { - return true; + return change; } } } - return false; + return null; } private void moveReadPointerIfNeeded(long committedWritePointer) { @@ -1385,9 +1405,9 @@ public class TransactionManager extends AbstractService { } private abstract static class DaemonThreadExecutor extends Thread { - private AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); - public DaemonThreadExecutor(String name) { + DaemonThreadExecutor(String name) { super(name); setDaemon(true); } @@ -1577,4 +1597,25 @@ public class TransactionManager extends AbstractService { } } + /** + * Represents a set of changes from a client. + */ + public static class ChangeSet { + final String clientId; + final Set<ChangeId> changeIds; + + ChangeSet(@Nullable String clientId, Set<ChangeId> changeIds) { + this.clientId = clientId; + this.changeIds = changeIds; + } + + @Nullable + public String getClientId() { + return clientId; + } + + public Set<ChangeId> getChangeIds() { + return changeIds; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java index a44f131..29fbe62 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java @@ -60,7 +60,7 @@ public interface TransactionSystemClient { /** * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call - * and {@link #commit(Transaction)} which may check conflicts again to avoid races. + * and {@link #commitOrThrow(Transaction)} which will check conflicts again to avoid races. * <p/> * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts * before we persist changes to avoid rollback in case of conflicts as much as possible. @@ -80,7 +80,7 @@ public interface TransactionSystemClient { /** * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call - * and {@link #commit(Transaction)} which may check conflicts again to avoid races. + * and {@link #commitOrThrow(Transaction)} which will check conflicts again to avoid races. * <p/> * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts * before we persist changes to avoid rollback in case of conflicts as much as possible. @@ -89,21 +89,38 @@ public interface TransactionSystemClient { * * @param tx transaction to verify * @param changeIds ids of changes made by transaction - * @return true if transaction can be committed otherwise false - * @throws TransactionSizeException if the size of the chgange set exceeds the allowed limit - * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out. + * + * @throws TransactionSizeException if the size of the change set exceeds the allowed limit + * @throws TransactionConflictException if the change set has a conflict with an overlapping transaction + * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out */ - boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException; + void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) + throws TransactionFailureException; /** * Makes transaction visible. It will again check conflicts of changes submitted previously with - * {@link #canCommit(Transaction, java.util.Collection)} + * {@link #canCommitOrThrow(Transaction, java.util.Collection)} + * * @param tx transaction to make visible. * @return true if transaction can be committed otherwise false + * + * @deprecated as of 0.13-incubating. Use {@link #canCommitOrThrow(Transaction, Collection)} instead. */ + @Deprecated boolean commit(Transaction tx) throws TransactionNotInProgressException; /** + * Makes transaction visible. It will again check conflicts of changes submitted previously with + * {@link #canCommitOrThrow(Transaction, java.util.Collection)} + * + * @param tx transaction to make visible. + * + * @throws TransactionConflictException if the transaction has a conflict with an overlapping transaction + * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out + */ + void commitOrThrow(Transaction tx) throws TransactionFailureException; + + /** * Makes transaction visible. You should call it only when all changes of this tx are undone. * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out. * @param tx transaction to make visible. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index 5c78aa4..3a6b70a 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -196,6 +196,17 @@ public class TxConstants { public static final long DEFAULT_TX_CHANGESET_SIZE_LIMIT = Long.MAX_VALUE; /** The default warning threshold for the total size in bytes of a change set is unlimited. */ public static final long DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD = Long.MAX_VALUE; + + /** Whether and how long to retain the client id of a transaction. Valid values are: + * <ul> + * <li>OFF - do not retain the client id at all</li> + * <li>ACTIVE - retain the client id until a transaction commits, aborts, or is invalidated</li> + * <li>COMMITTED - retain the client id after it commits, as long as it participates in conflict detection</li> + * </ul> + */ + public static final String CFG_TX_RETAIN_CLIENT_ID = "data.tx.retain.client.id"; + /** Default for how long to retain a transaction's client id */ + public static final String DEFAULT_TX_RETAIN_CLIENT_ID = "COMMITTED"; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java index f1743de..938eef5 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java @@ -27,6 +27,7 @@ import com.google.inject.name.Named; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.InvalidTruncateTimeException; import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionCouldNotTakeSnapshotException; import org.apache.tephra.TransactionFailureException; import org.apache.tephra.TransactionNotInProgressException; @@ -75,7 +76,6 @@ public class TransactionServiceClient implements TransactionSystemClient { /** * Utility to be used for basic verification of transaction system availability and functioning * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx - * @throws Exception */ public static void main(String[] args) throws Exception { if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) { @@ -115,19 +115,14 @@ public class TransactionServiceClient implements TransactionSystemClient { ", invalids: " + tx.getInvalids().length + ", inProgress: " + tx.getInProgress().length); } - LOG.info("Checking if canCommit tx..."); - boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList()); - LOG.info("canCommit: " + canCommit); - if (canCommit) { + try { + LOG.info("Checking if canCommit tx..."); + client.canCommitOrThrow(tx, Collections.<byte[]>emptyList()); + LOG.info("canCommit: success"); LOG.info("Committing tx..."); - boolean committed = client.commit(tx); - LOG.info("Committed tx: " + committed); - if (!committed) { - LOG.info("Aborting tx..."); - client.abort(tx); - LOG.info("Aborted tx..."); - } - } else { + client.commitOrThrow(tx); + LOG.info("Committed tx: success"); + } catch (TransactionConflictException e) { LOG.info("Aborting tx..."); client.abort(tx); LOG.info("Aborted tx..."); @@ -322,25 +317,16 @@ public class TransactionServiceClient implements TransactionSystemClient { @Override public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds) throws TransactionNotInProgressException { - try { - return execute( - new Operation<Boolean>("canCommit") { - @Override - public Boolean execute(TransactionServiceThriftClient client) - throws Exception { - return client.canCommit(tx, changeIds); - } - }); - } catch (TransactionNotInProgressException e) { - throw e; - } catch (Exception e) { - throw Throwables.propagate(e); + canCommitOrThrow(tx, changeIds); + return true; + } catch (TransactionFailureException e) { + return false; } } @Override - public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds) + public void canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds) throws TransactionFailureException { // we want to validate the size of the change set here before sending it over the wire. @@ -369,15 +355,16 @@ public class TransactionServiceClient implements TransactionSystemClient { } try { - return execute( - new Operation<Boolean>("canCommit") { + execute( + new Operation<Void>("canCommit") { @Override - public Boolean execute(TransactionServiceThriftClient client) + public Void execute(TransactionServiceThriftClient client) throws Exception { - return client.canCommitOrThrow(tx, changeIds); + client.canCommit(tx, changeIds); + return null; } }); - } catch (TransactionNotInProgressException | TransactionSizeException e) { + } catch (TransactionNotInProgressException | TransactionSizeException | TransactionConflictException e) { throw e; } catch (Exception e) { throw Throwables.propagate(e); @@ -387,15 +374,26 @@ public class TransactionServiceClient implements TransactionSystemClient { @Override public boolean commit(final Transaction tx) throws TransactionNotInProgressException { try { - return this.execute( - new Operation<Boolean>("commit") { + commitOrThrow(tx); + return true; + } catch (TransactionFailureException e) { + return false; + } + } + + @Override + public void commitOrThrow(final Transaction tx) + throws TransactionFailureException { + try { + execute( + new Operation<Void>("commit") { @Override - public Boolean execute(TransactionServiceThriftClient client) - throws Exception { - return client.commit(tx); + public Void execute(TransactionServiceThriftClient client) throws Exception { + client.commit(tx.getTransactionId(), tx.getWritePointer()); + return null; } }); - } catch (TransactionNotInProgressException e) { + } catch (TransactionNotInProgressException | TransactionConflictException e) { throw e; } catch (Exception e) { throw Throwables.propagate(e); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java index ba37243..6ce7b84 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java @@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.tephra.InvalidTruncateTimeException; import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionCouldNotTakeSnapshotException; import org.apache.tephra.TransactionNotInProgressException; import org.apache.tephra.TransactionSizeException; import org.apache.tephra.distributed.thrift.TGenericException; import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException; +import org.apache.tephra.distributed.thrift.TTransactionConflictException; import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException; import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException; import org.apache.tephra.distributed.thrift.TTransactionServer; @@ -64,12 +66,12 @@ public class TransactionServiceThriftClient { /** * The thrift transport layer. We need this when we close the connection. */ - TTransport transport; + private TTransport transport; /** * The actual thrift client. */ - TTransactionServer.Client client; + private TTransactionServer.Client client; /** * Whether this client is valid for use. @@ -184,30 +186,19 @@ public class TransactionServiceThriftClient { } } - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) - throws TException, TransactionNotInProgressException { + public void canCommit(Transaction tx, Collection<byte[]> changeIds) + throws TException, TransactionNotInProgressException, TransactionSizeException, TransactionConflictException { try { - return client.canCommitTx(TransactionConverterUtils.wrap(tx), - ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue(); - } catch (TTransactionNotInProgressException e) { - throw new TransactionNotInProgressException(e.getMessage()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) - throws TException, TransactionNotInProgressException, TransactionSizeException { - try { - return client.canCommitTx(TransactionConverterUtils.wrap(tx), - ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue(); + client.canCommitOrThrow(tx.getTransactionId(), + ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))); } catch (TTransactionNotInProgressException e) { throw new TransactionNotInProgressException(e.getMessage()); + } catch (TTransactionConflictException e) { + throw new TransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient()); } catch (TGenericException e) { // currently, we only expect TransactionSizeException here if (!TransactionSizeException.class.getName().equals(e.getOriginalExceptionClass())) { - LOG.trace("Expecting only {} as the original exception class but found {}", + LOG.debug("Expecting only {} as the original exception class but found {}", TransactionSizeException.class.getName(), e.getOriginalExceptionClass()); throw e; } @@ -218,11 +209,18 @@ public class TransactionServiceThriftClient { } } - public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException { + public void commit(long txId, long wp) + throws TException, TransactionNotInProgressException, TransactionConflictException { try { - return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue(); + client.commitOrThrow(txId, wp); } catch (TTransactionNotInProgressException e) { throw new TransactionNotInProgressException(e.getMessage()); + } catch (TTransactionConflictException e) { + throw new TransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient()); + } catch (TGenericException e) { + // we never throw this from commitOrThrow() - it was added as place holder to avoid future thrift API changes + LOG.debug("Unexpected {} from commitOrThrow()", TGenericException.class.getName()); + throw e; } catch (TException e) { isValid.set(false); throw e; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java index 0c9105b..95988c0 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java @@ -20,6 +20,7 @@ package org.apache.tephra.distributed; import com.google.common.collect.Sets; import org.apache.tephra.InvalidTruncateTimeException; +import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionNotInProgressException; import org.apache.tephra.TransactionSizeException; @@ -28,6 +29,7 @@ import org.apache.tephra.distributed.thrift.TBoolean; import org.apache.tephra.distributed.thrift.TGenericException; import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException; import org.apache.tephra.distributed.thrift.TTransaction; +import org.apache.tephra.distributed.thrift.TTransactionConflictException; import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException; import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException; import org.apache.tephra.distributed.thrift.TTransactionServer; @@ -119,25 +121,16 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface @Override public TBoolean canCommitTx(TTransaction tx, Set<ByteBuffer> changes) throws TException { - - Set<byte[]> changeIds = Sets.newHashSet(); - for (ByteBuffer bb : changes) { - byte[] changeId = new byte[bb.remaining()]; - bb.get(changeId); - changeIds.add(changeId); - } try { - return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds)); - } catch (TransactionNotInProgressException e) { - throw new TTransactionNotInProgressException(e.getMessage()); - } catch (TransactionSizeException e) { - return new TBoolean(false); // can't throw exception -> just indicate that it failed + canCommitOrThrow(tx.getTransactionId(), changes); + return new TBoolean(true); + } catch (TTransactionConflictException | TGenericException e) { + return new TBoolean(false); } } @Override - public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TException { - + public void canCommitOrThrow(long txId, Set<ByteBuffer> changes) throws TException { Set<byte[]> changeIds = Sets.newHashSet(); for (ByteBuffer bb : changes) { byte[] changeId = new byte[bb.remaining()]; @@ -145,20 +138,34 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface changeIds.add(changeId); } try { - return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds)); + txManager.canCommit(txId, changeIds); } catch (TransactionNotInProgressException e) { throw new TTransactionNotInProgressException(e.getMessage()); } catch (TransactionSizeException e) { throw new TGenericException(e.getMessage(), TransactionSizeException.class.getName()); + } catch (TransactionConflictException e) { + throw new TTransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient()); } } @Override public TBoolean commitTx(TTransaction tx) throws TException { try { - return new TBoolean(txManager.commit(TransactionConverterUtils.unwrap(tx))); + commitOrThrow(tx.getTransactionId(), tx.getWritePointer()); + return new TBoolean(true); + } catch (TTransactionConflictException | TGenericException e) { + return new TBoolean(false); + } + } + + @Override + public void commitOrThrow(long txId, long wp) throws TException { + try { + txManager.commit(txId, wp); } catch (TransactionNotInProgressException e) { throw new TTransactionNotInProgressException(e.getMessage()); + } catch (TransactionConflictException e) { + throw new TTransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient()); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java new file mode 100644 index 0000000..d7c6c9f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.tephra.distributed.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TTransactionConflictException extends TException implements org.apache.thrift.TBase<TTransactionConflictException, TTransactionConflictException._Fields>, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TTransactionConflictException"); + + private static final org.apache.thrift.protocol.TField TRANSACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("transactionId", org.apache.thrift.protocol.TType.I64, (short)1); + private static final org.apache.thrift.protocol.TField CONFLICTING_KEY_FIELD_DESC = new org.apache.thrift.protocol.TField("conflictingKey", org.apache.thrift.protocol.TType.STRING, (short)2); + private static final org.apache.thrift.protocol.TField CONFLICTING_CLIENT_FIELD_DESC = new org.apache.thrift.protocol.TField("conflictingClient", org.apache.thrift.protocol.TType.STRING, (short)3); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new TTransactionConflictExceptionStandardSchemeFactory()); + schemes.put(TupleScheme.class, new TTransactionConflictExceptionTupleSchemeFactory()); + } + + public long transactionId; // required + public String conflictingKey; // required + public String conflictingClient; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + TRANSACTION_ID((short)1, "transactionId"), + CONFLICTING_KEY((short)2, "conflictingKey"), + CONFLICTING_CLIENT((short)3, "conflictingClient"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // TRANSACTION_ID + return TRANSACTION_ID; + case 2: // CONFLICTING_KEY + return CONFLICTING_KEY; + case 3: // CONFLICTING_CLIENT + return CONFLICTING_CLIENT; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __TRANSACTIONID_ISSET_ID = 0; + private byte __isset_bitfield = 0; + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.TRANSACTION_ID, new org.apache.thrift.meta_data.FieldMetaData("transactionId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.CONFLICTING_KEY, new org.apache.thrift.meta_data.FieldMetaData("conflictingKey", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.CONFLICTING_CLIENT, new org.apache.thrift.meta_data.FieldMetaData("conflictingClient", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TTransactionConflictException.class, metaDataMap); + } + + public TTransactionConflictException() { + } + + public TTransactionConflictException( + long transactionId, + String conflictingKey, + String conflictingClient) + { + this(); + this.transactionId = transactionId; + setTransactionIdIsSet(true); + this.conflictingKey = conflictingKey; + this.conflictingClient = conflictingClient; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public TTransactionConflictException(TTransactionConflictException other) { + __isset_bitfield = other.__isset_bitfield; + this.transactionId = other.transactionId; + if (other.isSetConflictingKey()) { + this.conflictingKey = other.conflictingKey; + } + if (other.isSetConflictingClient()) { + this.conflictingClient = other.conflictingClient; + } + } + + public TTransactionConflictException deepCopy() { + return new TTransactionConflictException(this); + } + + @Override + public void clear() { + setTransactionIdIsSet(false); + this.transactionId = 0; + this.conflictingKey = null; + this.conflictingClient = null; + } + + public long getTransactionId() { + return this.transactionId; + } + + public TTransactionConflictException setTransactionId(long transactionId) { + this.transactionId = transactionId; + setTransactionIdIsSet(true); + return this; + } + + public void unsetTransactionId() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID); + } + + /** Returns true if field transactionId is set (has been assigned a value) and false otherwise */ + public boolean isSetTransactionId() { + return EncodingUtils.testBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID); + } + + public void setTransactionIdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID, value); + } + + public String getConflictingKey() { + return this.conflictingKey; + } + + public TTransactionConflictException setConflictingKey(String conflictingKey) { + this.conflictingKey = conflictingKey; + return this; + } + + public void unsetConflictingKey() { + this.conflictingKey = null; + } + + /** Returns true if field conflictingKey is set (has been assigned a value) and false otherwise */ + public boolean isSetConflictingKey() { + return this.conflictingKey != null; + } + + public void setConflictingKeyIsSet(boolean value) { + if (!value) { + this.conflictingKey = null; + } + } + + public String getConflictingClient() { + return this.conflictingClient; + } + + public TTransactionConflictException setConflictingClient(String conflictingClient) { + this.conflictingClient = conflictingClient; + return this; + } + + public void unsetConflictingClient() { + this.conflictingClient = null; + } + + /** Returns true if field conflictingClient is set (has been assigned a value) and false otherwise */ + public boolean isSetConflictingClient() { + return this.conflictingClient != null; + } + + public void setConflictingClientIsSet(boolean value) { + if (!value) { + this.conflictingClient = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case TRANSACTION_ID: + if (value == null) { + unsetTransactionId(); + } else { + setTransactionId((Long)value); + } + break; + + case CONFLICTING_KEY: + if (value == null) { + unsetConflictingKey(); + } else { + setConflictingKey((String)value); + } + break; + + case CONFLICTING_CLIENT: + if (value == null) { + unsetConflictingClient(); + } else { + setConflictingClient((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case TRANSACTION_ID: + return Long.valueOf(getTransactionId()); + + case CONFLICTING_KEY: + return getConflictingKey(); + + case CONFLICTING_CLIENT: + return getConflictingClient(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case TRANSACTION_ID: + return isSetTransactionId(); + case CONFLICTING_KEY: + return isSetConflictingKey(); + case CONFLICTING_CLIENT: + return isSetConflictingClient(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof TTransactionConflictException) + return this.equals((TTransactionConflictException)that); + return false; + } + + public boolean equals(TTransactionConflictException that) { + if (that == null) + return false; + + boolean this_present_transactionId = true; + boolean that_present_transactionId = true; + if (this_present_transactionId || that_present_transactionId) { + if (!(this_present_transactionId && that_present_transactionId)) + return false; + if (this.transactionId != that.transactionId) + return false; + } + + boolean this_present_conflictingKey = true && this.isSetConflictingKey(); + boolean that_present_conflictingKey = true && that.isSetConflictingKey(); + if (this_present_conflictingKey || that_present_conflictingKey) { + if (!(this_present_conflictingKey && that_present_conflictingKey)) + return false; + if (!this.conflictingKey.equals(that.conflictingKey)) + return false; + } + + boolean this_present_conflictingClient = true && this.isSetConflictingClient(); + boolean that_present_conflictingClient = true && that.isSetConflictingClient(); + if (this_present_conflictingClient || that_present_conflictingClient) { + if (!(this_present_conflictingClient && that_present_conflictingClient)) + return false; + if (!this.conflictingClient.equals(that.conflictingClient)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(TTransactionConflictException other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + TTransactionConflictException typedOther = (TTransactionConflictException)other; + + lastComparison = Boolean.valueOf(isSetTransactionId()).compareTo(typedOther.isSetTransactionId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTransactionId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.transactionId, typedOther.transactionId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetConflictingKey()).compareTo(typedOther.isSetConflictingKey()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetConflictingKey()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.conflictingKey, typedOther.conflictingKey); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetConflictingClient()).compareTo(typedOther.isSetConflictingClient()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetConflictingClient()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.conflictingClient, typedOther.conflictingClient); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("TTransactionConflictException("); + boolean first = true; + + sb.append("transactionId:"); + sb.append(this.transactionId); + first = false; + if (!first) sb.append(", "); + sb.append("conflictingKey:"); + if (this.conflictingKey == null) { + sb.append("null"); + } else { + sb.append(this.conflictingKey); + } + first = false; + if (!first) sb.append(", "); + sb.append("conflictingClient:"); + if (this.conflictingClient == null) { + sb.append("null"); + } else { + sb.append(this.conflictingClient); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class TTransactionConflictExceptionStandardSchemeFactory implements SchemeFactory { + public TTransactionConflictExceptionStandardScheme getScheme() { + return new TTransactionConflictExceptionStandardScheme(); + } + } + + private static class TTransactionConflictExceptionStandardScheme extends StandardScheme<TTransactionConflictException> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, TTransactionConflictException struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // TRANSACTION_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.transactionId = iprot.readI64(); + struct.setTransactionIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // CONFLICTING_KEY + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.conflictingKey = iprot.readString(); + struct.setConflictingKeyIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // CONFLICTING_CLIENT + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.conflictingClient = iprot.readString(); + struct.setConflictingClientIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, TTransactionConflictException struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(TRANSACTION_ID_FIELD_DESC); + oprot.writeI64(struct.transactionId); + oprot.writeFieldEnd(); + if (struct.conflictingKey != null) { + oprot.writeFieldBegin(CONFLICTING_KEY_FIELD_DESC); + oprot.writeString(struct.conflictingKey); + oprot.writeFieldEnd(); + } + if (struct.conflictingClient != null) { + oprot.writeFieldBegin(CONFLICTING_CLIENT_FIELD_DESC); + oprot.writeString(struct.conflictingClient); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class TTransactionConflictExceptionTupleSchemeFactory implements SchemeFactory { + public TTransactionConflictExceptionTupleScheme getScheme() { + return new TTransactionConflictExceptionTupleScheme(); + } + } + + private static class TTransactionConflictExceptionTupleScheme extends TupleScheme<TTransactionConflictException> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, TTransactionConflictException struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetTransactionId()) { + optionals.set(0); + } + if (struct.isSetConflictingKey()) { + optionals.set(1); + } + if (struct.isSetConflictingClient()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); + if (struct.isSetTransactionId()) { + oprot.writeI64(struct.transactionId); + } + if (struct.isSetConflictingKey()) { + oprot.writeString(struct.conflictingKey); + } + if (struct.isSetConflictingClient()) { + oprot.writeString(struct.conflictingClient); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, TTransactionConflictException struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(3); + if (incoming.get(0)) { + struct.transactionId = iprot.readI64(); + struct.setTransactionIdIsSet(true); + } + if (incoming.get(1)) { + struct.conflictingKey = iprot.readString(); + struct.setConflictingKeyIsSet(true); + } + if (incoming.get(2)) { + struct.conflictingClient = iprot.readString(); + struct.setConflictingClientIsSet(true); + } + } + } + +} +
