Repository: incubator-tephra Updated Branches: refs/heads/master 0b209bb41 -> a22c11d81
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java index ff796c1..787832b 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java @@ -28,6 +28,7 @@ import org.apache.tephra.TransactionExecutor; import org.apache.tephra.TransactionExecutorFactory; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; import org.apache.tephra.distributed.TransactionServiceClient; import org.apache.tephra.metrics.DefaultMetricsCollector; import org.apache.tephra.metrics.MetricsCollector; @@ -35,11 +36,23 @@ import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.snapshot.SnapshotCodecProvider; +import java.lang.management.ManagementFactory; + /** * Guice bindings for running in distributed mode on a cluster. */ final class TransactionDistributedModule extends AbstractModule { + private final String clientId; + + public TransactionDistributedModule() { + this(ManagementFactory.getRuntimeMXBean().getName()); + } + + public TransactionDistributedModule(String clientId) { + this.clientId = clientId; + } + @Override protected void configure() { // some of these classes need to be non-singleton in order to create a new instance during leader() in @@ -48,6 +61,8 @@ final class TransactionDistributedModule extends AbstractModule { bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")).to(HDFSTransactionStateStorage.class); bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class); + bindConstant().annotatedWith(Names.named(TxConstants.CLIENT_ID)).to(clientId); + // to catch issues during configure time bind(TransactionManager.class); bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Scopes.SINGLETON); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java index 0c5a3f3..f5f864b 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java @@ -21,11 +21,13 @@ package org.apache.tephra.runtime; import com.google.inject.AbstractModule; import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.google.inject.name.Names; import org.apache.tephra.DefaultTransactionExecutor; import org.apache.tephra.TransactionExecutor; import org.apache.tephra.TransactionExecutorFactory; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.MetricsCollector; import org.apache.tephra.metrics.TxMetricsCollector; @@ -33,11 +35,22 @@ import org.apache.tephra.persist.NoOpTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.snapshot.SnapshotCodecProvider; +import java.lang.management.ManagementFactory; + /** * Guice bindings for running completely in-memory (no persistence). This should only be used for * test classes, as the transaction state cannot be recovered in the case of a failure. */ public class TransactionInMemoryModule extends AbstractModule { + private final String clientId; + + public TransactionInMemoryModule() { + this(ManagementFactory.getRuntimeMXBean().getName()); + } + + public TransactionInMemoryModule(String clientId) { + this.clientId = clientId; + } @Override protected void configure() { @@ -48,6 +61,8 @@ public class TransactionInMemoryModule extends AbstractModule { // no metrics output for in-memory bind(MetricsCollector.class).to(TxMetricsCollector.class); + bindConstant().annotatedWith(Names.named(TxConstants.CLIENT_ID)).to(clientId); + install(new FactoryModuleBuilder() .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) .build(TransactionExecutorFactory.class)); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java index 0dba09e..213abb0 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java @@ -28,6 +28,7 @@ import org.apache.tephra.TransactionExecutor; import org.apache.tephra.TransactionExecutorFactory; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.DefaultMetricsCollector; import org.apache.tephra.metrics.MetricsCollector; @@ -35,10 +36,21 @@ import org.apache.tephra.persist.LocalFileTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.snapshot.SnapshotCodecProvider; +import java.lang.management.ManagementFactory; + /** * Guice bindings for running in single-node mode (persistence to local disk and in-memory client). */ final class TransactionLocalModule extends AbstractModule { + private final String clientId; + + public TransactionLocalModule() { + this(ManagementFactory.getRuntimeMXBean().getName()); + } + + public TransactionLocalModule(String clientId) { + this.clientId = clientId; + } @Override protected void configure() { @@ -51,6 +63,8 @@ final class TransactionLocalModule extends AbstractModule { bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); bind(MetricsCollector.class).to(DefaultMetricsCollector.class); + bindConstant().annotatedWith(Names.named(TxConstants.CLIENT_ID)).to(clientId); + install(new FactoryModuleBuilder() .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) .build(TransactionExecutorFactory.class)); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java index a3fe1c1..1e021d3 100644 --- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java @@ -20,22 +20,31 @@ package org.apache.tephra.runtime; import com.google.inject.Module; +import java.lang.management.ManagementFactory; + /** * Provides access to Google Guice modules for in-memory, single-node, and distributed operation. */ public class TransactionModules { + private final String clientId; + + public TransactionModules(String clientId) { + this.clientId = clientId; + } + public TransactionModules() { + this(ManagementFactory.getRuntimeMXBean().getName()); } public Module getInMemoryModules() { - return new TransactionInMemoryModule(); + return new TransactionInMemoryModule(clientId); } public Module getSingleNodeModules() { - return new TransactionLocalModule(); + return new TransactionLocalModule(clientId); } public Module getDistributedModules() { - return new TransactionDistributedModule(); + return new TransactionDistributedModule(clientId); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/thrift/README ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/thrift/README b/tephra-core/src/main/thrift/README index a4962be..3bb7a62 100644 --- a/tephra-core/src/main/thrift/README +++ b/tephra-core/src/main/thrift/README @@ -19,6 +19,6 @@ To generate thrift classes: thrift --gen java --out ../java/ transaction.thrift -To add the Apache license header to the generated fiels: +To add the Apache license header to the generated files: for f in ../java/org/apache/tephra/distributed/thrift/T*.java; do mv $f nn; cat header nn > $f; rm -f nn; done http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/thrift/transaction.thrift ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift index 454450e..f4460e5 100644 --- a/tephra-core/src/main/thrift/transaction.thrift +++ b/tephra-core/src/main/thrift/transaction.thrift @@ -69,8 +69,11 @@ service TTransactionServer { // temporary tx2 stuff TTransaction startLong(), TTransaction startShort(), + TTransaction startLongClientId(1: string clientId) throws (1: TGenericException e), // TODO remove this as it was replaced with startShortWithTimeout in 0.10 TTransaction startShortTimeout(1: i32 timeout), + TTransaction startShortClientId(1: string clientId) throws (1: TGenericException e), + TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1:TGenericException e), TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e), TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException e), TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java index 20f6944..56a9076 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java @@ -61,7 +61,7 @@ public class TransactionContextTest { new ConfigModule(conf), new DiscoveryModules().getInMemoryModules(), Modules.override( - new TransactionModules().getInMemoryModules()).with(new AbstractModule() { + new TransactionModules("clientA").getInMemoryModules()).with(new AbstractModule() { @Override protected void configure() { TransactionManager txManager = new TransactionManager(conf); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java index 28ccc6e..b96b779 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java @@ -63,7 +63,7 @@ public class TransactionExecutorTest { new ConfigModule(conf), new DiscoveryModules().getInMemoryModules(), Modules.override( - new TransactionModules().getInMemoryModules()).with(new AbstractModule() { + new TransactionModules("clientB").getInMemoryModules()).with(new AbstractModule() { @Override protected void configure() { TransactionManager txManager = new TransactionManager(conf); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java index ec06528..971c93c 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java @@ -131,25 +131,24 @@ public abstract class AbstractTransactionStateStorageTest { TransactionStateStorage storage3 = null; try { storage = getStorage(conf); - TransactionManager txManager = new TransactionManager - (conf, storage, new TxMetricsCollector()); + TransactionManager txManager = new TransactionManager(conf, storage, new TxMetricsCollector()); txManager.startAndWait(); // TODO: replace with new persistence tests final byte[] a = { 'a' }; final byte[] b = { 'b' }; // Start and invalidate a transaction - Transaction invalid = txManager.startShort(); + Transaction invalid = txManager.startShort("clientTx"); txManager.invalidate(invalid.getTransactionId()); // start a tx1, add a change A and commit - Transaction tx1 = txManager.startShort(); + Transaction tx1 = txManager.startShort("client1"); Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); Assert.assertTrue(txManager.commit(tx1)); // start a tx2 and add a change B - Transaction tx2 = txManager.startShort(); + Transaction tx2 = txManager.startShort("client2"); Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); // start a tx3 - Transaction tx3 = txManager.startShort(); + Transaction tx3 = txManager.startShort("client3"); // restart txManager.stopAndWait(); TransactionSnapshot origState = txManager.getCurrentState(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 30b69a1..17d55a4 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -18,7 +18,6 @@ package org.apache.tephra.hbase.coprocessor; -import com.google.common.base.Supplier; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -127,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); + this.cacheSupplier = getTransactionStateCacheSupplier(env); this.cache = cacheSupplier.get(); HTableDescriptor tableDesc = env.getRegion().getTableDesc();
