TEPHRA-228 Adding the ability to pass-in a clientId during the start of a transaction which is logged when the transaction gets invalidated during time out.
This closes #42 from GitHub. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/a22c11d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/a22c11d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/a22c11d8 Branch: refs/heads/master Commit: a22c11d81eb08c4afb0071922aa90c42987ffe59 Parents: 0b209bb Author: Gokul Gunasekaran <[email protected]> Authored: Mon May 15 18:06:10 2017 -0700 Committer: Gokul Gunasekaran <[email protected]> Committed: Mon May 22 16:51:05 2017 -0700 ---------------------------------------------------------------------- .../org/apache/tephra/TransactionContext.java | 2 +- .../org/apache/tephra/TransactionManager.java | 98 +- .../java/org/apache/tephra/TxConstants.java | 5 + .../distributed/TransactionServiceClient.java | 26 +- .../TransactionServiceThriftClient.java | 51 + .../TransactionServiceThriftHandler.java | 27 + .../distributed/thrift/TTransactionServer.java | 3040 +++++++++++++++++- .../runtime/TransactionDistributedModule.java | 15 + .../runtime/TransactionInMemoryModule.java | 15 + .../tephra/runtime/TransactionLocalModule.java | 14 + .../tephra/runtime/TransactionModules.java | 15 +- tephra-core/src/main/thrift/README | 2 +- tephra-core/src/main/thrift/transaction.thrift | 3 + .../apache/tephra/TransactionContextTest.java | 2 +- .../apache/tephra/TransactionExecutorTest.java | 2 +- .../AbstractTransactionStateStorageTest.java | 11 +- .../hbase/coprocessor/TransactionProcessor.java | 3 +- 17 files changed, 3175 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java index 7ca8f06..0806294 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java @@ -79,7 +79,7 @@ public class TransactionContext { } /** - * Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient}, + * Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient}, * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered * TransactionAware. If an exception is encountered, the transaction will be aborted and a * {@code TransactionFailureException} wrapping the root cause will be thrown. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index f2060cd..27b6bc6 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -118,8 +118,11 @@ public class TransactionManager extends AbstractService { //poll every 10 second to emit metrics private static final long METRICS_POLL_INTERVAL = 10000L; + //Client id that is used if a client doesn't provide one while starting a transaction. + private static final String DEFAULT_CLIENTID = "unknown"; + // Transactions that are in progress, with their info. - private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>(); + private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<>(); // the list of transactions that are invalid (not properly committed/aborted, or timed out) private final InvalidTxList invalidTxList = new InvalidTxList(); @@ -127,8 +130,7 @@ public class TransactionManager extends AbstractService { // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil) // todo: should this be consolidated with inProgress? // commit time next writePointer -> changes made by this tx - private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = - new ConcurrentSkipListMap<Long, Set<ChangeId>>(); + private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = new ConcurrentSkipListMap<>(); // not committed yet private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap(); @@ -347,15 +349,16 @@ public class TransactionManager extends AbstractService { long currentTime = System.currentTimeMillis(); Map<Long, InProgressType> timedOut = Maps.newHashMap(); for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) { - long expiration = tx.getValue().getExpiration(); + InProgressTx inProgressTx = tx.getValue(); + long expiration = inProgressTx.getExpiration(); if (expiration >= 0L && currentTime > expiration) { // timed out, remember tx id (can't remove while iterating over entries) - timedOut.put(tx.getKey(), tx.getValue().getType()); - LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey()); + timedOut.put(tx.getKey(), inProgressTx.getType()); + LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of timeout.", + tx.getKey(), inProgressTx.getClientId()); } else if (expiration < 0) { LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " + - "migrated correctly, this transaction will be expired immediately", - tx.getKey(), expiration); + "migrated correctly, this transaction will be expired immediately", tx.getKey(), expiration); timedOut.put(tx.getKey(), InProgressType.LONG); } } @@ -592,8 +595,8 @@ public class TransactionManager extends AbstractService { } else if (type == null) { type = TransactionType.SHORT; } - addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), - expiration, type); + // We don't persist the client id. + addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null); break; case COMMITTING: addCommittingChangeSet(edit.getWritePointer(), edit.getChanges()); @@ -727,10 +730,28 @@ public class TransactionManager extends AbstractService { } /** + * Start a short transaction with a client id and default timeout. + * @param clientId id of the client requesting a transaction. + */ + public Transaction startShort(String clientId) { + return startShort(clientId, defaultTimeout); + } + + /** * Start a short transaction with a given timeout. * @param timeoutInSeconds the time out period in seconds. */ public Transaction startShort(int timeoutInSeconds) { + return startShort(DEFAULT_CLIENTID, timeoutInSeconds); + } + + /** + * Start a short transaction with a given timeout. + * @param clientId id of the client requesting a transaction. + * @param timeoutInSeconds the time out period in seconds. + */ + public Transaction startShort(String clientId, int timeoutInSeconds) { + Preconditions.checkArgument(clientId != null, "clientId must not be null"); Preconditions.checkArgument(timeoutInSeconds > 0, "timeout must be positive but is %s seconds", timeoutInSeconds); Preconditions.checkArgument(timeoutInSeconds <= maxTimeout, @@ -738,7 +759,7 @@ public class TransactionManager extends AbstractService { txMetricsCollector.rate("start.short"); Stopwatch timer = new Stopwatch().start(); long expiration = getTxExpiration(timeoutInSeconds); - Transaction tx = startTx(expiration, TransactionType.SHORT); + Transaction tx = startTx(expiration, TransactionType.SHORT, clientId); txMetricsCollector.histogram("start.short.latency", (int) timer.elapsedMillis()); return tx; } @@ -763,15 +784,23 @@ public class TransactionManager extends AbstractService { * transaction moves it to the invalid list because we assume that its writes cannot be rolled back. */ public Transaction startLong() { + return startLong(DEFAULT_CLIENTID); + } + + /** + * Starts a long transaction with a client id. + */ + public Transaction startLong(String clientId) { + Preconditions.checkArgument(clientId != null, "clientId must not be null"); txMetricsCollector.rate("start.long"); Stopwatch timer = new Stopwatch().start(); long expiration = getTxExpiration(defaultLongTimeout); - Transaction tx = startTx(expiration, TransactionType.LONG); + Transaction tx = startTx(expiration, TransactionType.LONG, clientId); txMetricsCollector.histogram("start.long.latency", (int) timer.elapsedMillis()); return tx; } - private Transaction startTx(long expiration, TransactionType type) { + private Transaction startTx(long expiration, TransactionType type, @Nullable String clientId) { Transaction tx = null; long txid; // guard against changes to the transaction log while processing @@ -781,7 +810,7 @@ public class TransactionManager extends AbstractService { ensureAvailable(); txid = getNextWritePointer(); tx = createTransaction(txid, type); - addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type); + addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type, clientId); } // appending to WAL out of global lock for concurrent performance // we should still be able to arrive at the same state even if log entries are out of order @@ -793,13 +822,13 @@ public class TransactionManager extends AbstractService { } private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, - long expiration, TransactionType type) { - addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type)); + long expiration, TransactionType type, @Nullable String clientId) { + addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type), clientId); } private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, - long expiration, InProgressType type) { - inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type)); + long expiration, InProgressType type, @Nullable String clientId) { + inProgress.put(writePointer, new InProgressTx(clientId, visibilityUpperBound, expiration, type)); advanceWritePointer(writePointer); } @@ -1026,6 +1055,7 @@ public class TransactionManager extends AbstractService { Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer); // remove from in-progress set, so that it does not get excluded in the future InProgressTx previous = inProgress.remove(writePointer); + // This check is to prevent from invalidating committed transactions if (previous != null || previousChangeSet != null) { // add tx to invalids @@ -1040,7 +1070,12 @@ public class TransactionManager extends AbstractService { inProgress.keySet().removeAll(childWritePointers); } } - LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer); + + String clientId = DEFAULT_CLIENTID; + if (previous != null && previous.getClientId() != null) { + clientId = previous.getClientId(); + } + LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of invalidate", writePointer, clientId); if (previous != null && !previous.isLongRunning()) { // tx was short-running: must move read pointer moveReadPointerIfNeeded(writePointer); @@ -1170,7 +1205,7 @@ public class TransactionManager extends AbstractService { InProgressTx existingTx = inProgress.get(parentWritePointer); existingTx.addCheckpointWritePointer(newWritePointer); addInProgressAndAdvance(newWritePointer, existingTx.getVisibilityUpperBound(), existingTx.getExpiration(), - InProgressType.CHECKPOINT); + InProgressType.CHECKPOINT, existingTx.getClientId()); } // hack for exposing important metric @@ -1378,17 +1413,34 @@ public class TransactionManager extends AbstractService { private final long expiration; private final InProgressType type; private final LongArrayList checkpointWritePointers; + // clientId is not part of hashCode computation or equality check since it is not persisted. Once it is persisted + // and restored, we can include it in the above. + private final String clientId; + + public InProgressTx(String clientId, long visibilityUpperBound, long expiration, InProgressType type) { + this(clientId, visibilityUpperBound, expiration, type, new LongArrayList()); + } public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type) { this(visibilityUpperBound, expiration, type, new LongArrayList()); } + public InProgressTx(String clientId, long visibilityUpperBound, long expiration, InProgressType type, + LongArrayList checkpointWritePointers) { + this.visibilityUpperBound = visibilityUpperBound; + this.expiration = expiration; + this.type = type; + this.checkpointWritePointers = checkpointWritePointers; + this.clientId = clientId; + } + public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type, LongArrayList checkpointWritePointers) { this.visibilityUpperBound = visibilityUpperBound; this.expiration = expiration; this.type = type; this.checkpointWritePointers = checkpointWritePointers; + this.clientId = null; } // For backwards compatibility when long running txns were represented with -1 expiration @@ -1410,6 +1462,11 @@ public class TransactionManager extends AbstractService { return type; } + @Nullable + public String getClientId() { + return clientId; + } + public boolean isLongRunning() { if (type == null) { // for backwards compatibility when long running txns were represented with -1 expiration @@ -1455,6 +1512,7 @@ public class TransactionManager extends AbstractService { .add("expiration", expiration) .add("type", type) .add("checkpointWritePointers", checkpointWritePointers) + .add("clientId", clientId) .toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index 26a48fb..1dbd3cb 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -120,6 +120,11 @@ public class TxConstants { public static final boolean DEFAULT_READ_NON_TX_DATA = false; /** + * Used to inject the name of the client that is starting the transaction. + */ + public static final String CLIENT_ID = "tephra.client.id"; + + /** * TransactionManager configuration. */ public static final class Manager { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java index 4ee9615..5f7792a 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java @@ -23,6 +23,7 @@ import com.google.common.base.Throwables; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.name.Named; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.InvalidTruncateTimeException; import org.apache.tephra.Transaction; @@ -42,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; +import java.lang.management.ManagementFactory; import java.util.Collection; import java.util.Collections; import java.util.Set; @@ -60,6 +62,9 @@ public class TransactionServiceClient implements TransactionSystemClient { // the retry strategy we will use private final RetryStrategyProvider retryStrategyProvider; + // client id that is used to identify the transactions + private final String clientId; + /** * Utility to be used for basic verification of transaction system availability and functioning * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx @@ -126,14 +131,24 @@ public class TransactionServiceClient implements TransactionSystemClient { } /** + * Create from a configuration. This will first attempt to find a zookeeper for service discovery. + * This constructor can be used if Guice dependency injection is not used. JVM name will be used for the client id. + * @param config a configuration containing the zookeeper properties + */ + public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider) { + this(config, clientProvider, ManagementFactory.getRuntimeMXBean().getName()); + } + + /** * Create from a configuration. This will first attempt to find a zookeeper * for service discovery. Otherwise it will look for the port in the * config and use localhost. * @param config a configuration containing the zookeeper properties + * @param clientId id of the client that identifies it when it starts a transaction */ @Inject - public TransactionServiceClient(Configuration config, - ThriftClientProvider clientProvider) { + public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider, + @Named(TxConstants.CLIENT_ID) String clientId) { // initialize the retry logic String retryStrat = config.get( @@ -155,6 +170,7 @@ public class TransactionServiceClient implements TransactionSystemClient { LOG.debug("Retry strategy is " + this.retryStrategyProvider); this.clientProvider = clientProvider; + this.clientId = clientId; } /** @@ -247,7 +263,7 @@ public class TransactionServiceClient implements TransactionSystemClient { @Override public Transaction execute(TransactionServiceThriftClient client) throws TException { - return client.startLong(); + return client.startLong(clientId); } }); } catch (Exception e) { @@ -263,7 +279,7 @@ public class TransactionServiceClient implements TransactionSystemClient { @Override public Transaction execute(TransactionServiceThriftClient client) throws TException { - return client.startShort(); + return client.startShort(clientId); } }); } catch (Exception e) { @@ -279,7 +295,7 @@ public class TransactionServiceClient implements TransactionSystemClient { @Override public Transaction execute(TransactionServiceThriftClient client) throws TException { - return client.startShort(timeout); + return client.startShort(clientId, timeout); } }); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java index b76412f..8ba81e3 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java @@ -106,6 +106,23 @@ public class TransactionServiceThriftClient { } } + public Transaction startLong(String clientId) throws TException { + try { + return TransactionConverterUtils.unwrap(client.startLongClientId(clientId)); + } catch (TGenericException e) { + // currently, we only expect IllegalArgumentException here, if the clientId is null + if (!IllegalArgumentException.class.getName().equals(e.getOriginalExceptionClass())) { + LOG.trace("Expecting only {} as the original exception class but found {}", + IllegalArgumentException.class.getName(), e.getOriginalExceptionClass()); + throw e; + } + throw new IllegalArgumentException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + public Transaction startShort() throws TException { try { return TransactionConverterUtils.unwrap(client.startShort()); @@ -132,6 +149,40 @@ public class TransactionServiceThriftClient { } } + public Transaction startShort(String clientId) throws TException { + try { + return TransactionConverterUtils.unwrap(client.startShortClientId(clientId)); + } catch (TGenericException e) { + // currently, we only expect IllegalArgumentException here, if the clientId is null + if (!IllegalArgumentException.class.getName().equals(e.getOriginalExceptionClass())) { + LOG.trace("Expecting only {} as the original exception class but found {}", + IllegalArgumentException.class.getName(), e.getOriginalExceptionClass()); + throw e; + } + throw new IllegalArgumentException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public Transaction startShort(String clientId, int timeout) throws TException { + try { + return TransactionConverterUtils.unwrap(client.startShortWithClientIdAndTimeOut(clientId, timeout)); + } catch (TGenericException e) { + // currently, we only expect IllegalArgumentException here, if the timeout is invalid or if clientId is null + if (!IllegalArgumentException.class.getName().equals(e.getOriginalExceptionClass())) { + LOG.trace("Expecting only {} as the original exception class but found {}", + IllegalArgumentException.class.getName(), e.getOriginalExceptionClass()); + throw e; + } + throw new IllegalArgumentException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TException, TransactionNotInProgressException { try { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java index 6552cfe..954ee1d 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java @@ -73,11 +73,38 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface } @Override + public TTransaction startLongClientId(String clientId) throws TException { + try { + return TransactionConverterUtils.wrap(txManager.startLong(clientId)); + } catch (IllegalArgumentException ex) { + throw new TGenericException(ex.getMessage(), ex.getClass().getName()); + } + } + + @Override public TTransaction startShortTimeout(int timeout) throws TException { return TransactionConverterUtils.wrap(txManager.startShort(timeout)); } @Override + public TTransaction startShortClientId(String clientId) throws TException { + try { + return TransactionConverterUtils.wrap(txManager.startShort(clientId)); + } catch (IllegalArgumentException ex) { + throw new TGenericException(ex.getMessage(), ex.getClass().getName()); + } + } + + @Override + public TTransaction startShortWithClientIdAndTimeOut(String clientId, int timeout) throws TException { + try { + return TransactionConverterUtils.wrap(txManager.startShort(clientId, timeout)); + } catch (IllegalArgumentException ex) { + throw new TGenericException(ex.getMessage(), ex.getClass().getName()); + } + } + + @Override public TTransaction startShortWithTimeout(int timeout) throws TException { try { return TransactionConverterUtils.wrap(txManager.startShort(timeout));
