Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 aec1cd546 -> b9929872f
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 04882e0..01b775e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -29,10 +29,10 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; -import org.apache.tephra.TransactionConflictException; -import org.apache.tephra.TransactionFailureException; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TephraTransactionTable; +import org.apache.phoenix.transaction.TransactionFactory; public class TransactionUtil { private TransactionUtil() { @@ -43,30 +43,15 @@ public class TransactionUtil { } public static long convertToNanoseconds(long serverTimeStamp) { - return serverTimeStamp * TxConstants.MAX_TX_PER_MS; + return serverTimeStamp * TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); } public static long convertToMilliseconds(long serverTimeStamp) { - return serverTimeStamp / TxConstants.MAX_TX_PER_MS; + return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); } - public static SQLException getTransactionFailureException(TransactionFailureException e) { - if (e instanceof TransactionConflictException) { - return new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); - - } - return new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); - } - - public static TransactionAwareHTable getTransactionAwareHTable(HTableInterface htable, boolean isImmutableRows) { - // Conflict detection is not needed for tables with write-once/append-only data - return new TransactionAwareHTable(htable, isImmutableRows ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) { + return new TephraTransactionTable(phoenixTransactionContext, htable, pTable); } // we resolve transactional tables at the txn read pointer http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 05dbc3c..d40c5a9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -129,12 +129,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.jdbc.PhoenixTestDriver; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ConfigUtil; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -142,19 +142,6 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; -import org.apache.tephra.TransactionManager; -import org.apache.tephra.TxConstants; -import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.metrics.TxMetricsCollector; -import org.apache.tephra.persist.HDFSTransactionStateStorage; -import org.apache.tephra.snapshot.SnapshotCodecProvider; -import org.apache.twill.discovery.DiscoveryService; -import org.apache.twill.discovery.ZKDiscoveryService; -import org.apache.twill.internal.utils.Networks; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -164,7 +151,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.util.Providers; /** * @@ -185,9 +171,6 @@ public abstract class BaseTest { private static final Map<String,String> tableDDLMap; private static final Logger logger = LoggerFactory.getLogger(BaseTest.class); protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30; - private static ZKClientService zkClient; - private static TransactionService txService; - protected static TransactionManager txManager; @ClassRule public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static final int dropTableTimeout = 300; // 5 mins should be long enough. @@ -428,50 +411,15 @@ public abstract class BaseTest { } private static void tearDownTxManager() throws SQLException { - try { - if (txService != null) txService.stopAndWait(); - } finally { - try { - if (zkClient != null) zkClient.stopAndWait(); - } finally { - txService = null; - zkClient = null; - txManager = null; - } - } - + TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager(); } - + protected static void setTxnConfigs() throws IOException { - config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); - config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS); - config.unset(TxConstants.Manager.CFG_TX_HDFS_USER); - config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L); + TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS); } - - protected static void setupTxManager() throws SQLException, IOException { - ConnectionInfo connInfo = ConnectionInfo.create(getUrl()); - zkClient = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) - .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, - HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) - .build(), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) - ) - ) - ); - zkClient.startAndWait(); - DiscoveryService discovery = new ZKDiscoveryService(zkClient); - txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); - txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); - txService.startAndWait(); + protected static void setupTxManager() throws SQLException, IOException { + TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl()); } private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception { @@ -490,9 +438,7 @@ public abstract class BaseTest { } private static void checkTxManagerInitialized(ReadOnlyProps clientProps) throws SQLException, IOException { - if (txService == null) { - setupTxManager(); - } + setupTxManager(); } /**
