Remove tephra dependency from BaseTest
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b3a21368 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b3a21368 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b3a21368 Branch: refs/heads/omid Commit: b3a213685ef97a41c1a369f035949b22b03d6083 Parents: f090dd2 Author: Ohad Shacham <[email protected]> Authored: Mon May 8 12:27:11 2017 +0300 Committer: Ohad Shacham <[email protected]> Committed: Mon May 8 12:27:11 2017 +0300 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 14 ++-- .../transaction/OmidTransactionContext.java | 19 ++++++ .../transaction/PhoenixTransactionContext.java | 16 +++++ .../transaction/TephraTransactionContext.java | 64 ++++++++++++++++++ .../transaction/TephraTransactionTable.java | 12 +++- .../apache/phoenix/util/TransactionUtil.java | 4 +- .../java/org/apache/phoenix/query/BaseTest.java | 68 ++------------------ 7 files changed, 124 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 2b72be1..e8d963e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -297,7 +297,7 @@ public class MutationState implements SQLCloseable { public HTableInterface getHTable(PTable table) throws SQLException { HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) { - PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table.isImmutableRows()); + PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table); // Using cloned mutationState as we may have started a new transaction already // if auto commit is true and we need to use the original one here. htable = phoenixTransactionTable; @@ -970,7 +970,7 @@ public class MutationState implements SQLCloseable { if (table.isTransactional()) { // Track tables to which we've sent uncommitted data txTableRefs.add(origTableRef); - addDMLFence(table); +// addDMLFence(table); uncommittedPhysicalNames.add(table.getPhysicalName().getString()); // If we have indexes, wrap the HTable in a delegate HTable that @@ -980,7 +980,7 @@ public class MutationState implements SQLCloseable { hTable = new MetaDataAwareHTable(hTable, origTableRef); } - hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table.isImmutableRows()); + hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table); } long numMutations = mutationList.size(); @@ -1231,10 +1231,10 @@ public class MutationState implements SQLCloseable { startTransaction(); // Add back read fences Set<TableRef> txTableRefs = txMutations.keySet(); - for (TableRef tableRef : txTableRefs) { - PTable dataTable = tableRef.getTable(); - addDMLFence(dataTable); - } +// for (TableRef tableRef : txTableRefs) { +// PTable dataTable = tableRef.getTable(); +// addDMLFence(dataTable); +// } try { // Only retry if an index was added retryCommit = shouldResubmitTransaction(txTableRefs); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index cec07d3..25ec0cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -1,5 +1,6 @@ package org.apache.phoenix.transaction; +import java.io.IOException; import java.sql.SQLException; import java.util.concurrent.TimeoutException; @@ -141,4 +142,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext { // TODO Auto-generated method stub return null; } + + @Override + public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void setupTxManager(Configuration config, String url) throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void tearDownTxManager() { + // TODO Auto-generated method stub + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index 36f7804..5b1a837 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -8,6 +8,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.twill.zookeeper.ZKClientService; import org.slf4j.Logger; +import java.io.IOException; import java.sql.SQLException; import java.util.concurrent.TimeoutException; @@ -164,4 +165,19 @@ public interface PhoenixTransactionContext { * @return the family delete marker */ public byte[] get_famility_delete_marker(); + + /** + * Setup transaction manager's configuration for testing + */ + public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException; + + /** + * Setup transaction manager for testing + */ + public void setupTxManager(Configuration config, String url) throws SQLException; + + /** + * Tear down transaction manager for testing + */ + public void tearDownTxManager(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 0334826..447ce0e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -35,7 +35,13 @@ import org.apache.tephra.util.TxUtils; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; import org.apache.tephra.zookeeper.TephraZKClientService; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.internal.utils.Networks; import org.apache.twill.zookeeper.RetryStrategies; import org.apache.twill.zookeeper.ZKClientService; import org.apache.twill.zookeeper.ZKClientServices; @@ -43,6 +49,7 @@ import org.apache.twill.zookeeper.ZKClients; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; +import com.google.inject.util.Providers; import org.slf4j.Logger; @@ -51,6 +58,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext { private static final TransactionCodec CODEC = new TransactionCodec(); private static TransactionSystemClient txClient = null; + private static ZKClientService zkClient = null; + private static TransactionService txService = null; + private static TransactionManager txManager = null; private final List<TransactionAware> txAwares; private final TransactionContext txContext; @@ -410,6 +420,60 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return TxConstants.FAMILY_DELETE_QUALIFIER; } + @Override + public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { + config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); + config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); + config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); + config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder); + config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds); + config.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L); + } + + @Override + public void setupTxManager(Configuration config, String url) throws SQLException { + + if (txService != null) { + return; + } + + ConnectionInfo connInfo = ConnectionInfo.create(url); + zkClient = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + zkClient.startAndWait(); + + DiscoveryService discovery = new ZKDiscoveryService(zkClient); + txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); + txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); + txService.startAndWait(); + } + + @Override + public void tearDownTxManager() { + try { + if (txService != null) txService.stopAndWait(); + } finally { + try { + if (zkClient != null) zkClient.stopAndWait(); + } finally { + txService = null; + zkClient = null; + txManager = null; + } + } + } + /** * TephraTransactionContext specific functions */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java index e33a280..49753f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.tephra.TxConstants; import org.apache.tephra.hbase.TransactionAwareHTable; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; @@ -38,18 +40,22 @@ public class TephraTransactionTable implements PhoenixTransactionalTable { private TephraTransactionContext tephraTransactionContext; public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { - this(ctx, hTable, false); + this(ctx, hTable, null); } - public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, boolean isImmutableRows) { + public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) { assert(ctx instanceof TephraTransactionContext); tephraTransactionContext = (TephraTransactionContext) ctx; - transactionAwareHTable = new TransactionAwareHTable(hTable, isImmutableRows ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); tephraTransactionContext.addTransactionAware(transactionAwareHTable); + + if (pTable != null && pTable.getType() != PTableType.INDEX) { + tephraTransactionContext.markDMLFence(pTable); + } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 0a55147..01b775e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -50,8 +50,8 @@ public class TransactionUtil { return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); } - public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, boolean isImmutableRows) { - return new TephraTransactionTable(phoenixTransactionContext, htable, isImmutableRows); + public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) { + return new TephraTransactionTable(phoenixTransactionContext, htable, pTable); } // we resolve transactional tables at the txn read pointer http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a21368/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 078c1e8..ff1007d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -130,12 +130,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.jdbc.PhoenixTestDriver; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ConfigUtil; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -143,19 +143,6 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; -import org.apache.tephra.TransactionManager; -import org.apache.tephra.TxConstants; -import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.metrics.TxMetricsCollector; -import org.apache.tephra.persist.HDFSTransactionStateStorage; -import org.apache.tephra.snapshot.SnapshotCodecProvider; -import org.apache.twill.discovery.DiscoveryService; -import org.apache.twill.discovery.ZKDiscoveryService; -import org.apache.twill.internal.utils.Networks; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -165,7 +152,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.util.Providers; /** * @@ -186,9 +172,6 @@ public abstract class BaseTest { private static final Map<String,String> tableDDLMap; private static final Logger logger = LoggerFactory.getLogger(BaseTest.class); protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30; - private static ZKClientService zkClient; - private static TransactionService txService; - protected static TransactionManager txManager; @ClassRule public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static final int dropTableTimeout = 300; // 5 mins should be long enough. @@ -437,50 +420,15 @@ public abstract class BaseTest { } private static void tearDownTxManager() throws SQLException { - try { - if (txService != null) txService.stopAndWait(); - } finally { - try { - if (zkClient != null) zkClient.stopAndWait(); - } finally { - txService = null; - zkClient = null; - txManager = null; - } - } - + TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager(); } - + protected static void setTxnConfigs() throws IOException { - config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); - config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS); - config.unset(TxConstants.Manager.CFG_TX_HDFS_USER); - config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L); + TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS); } - - protected static void setupTxManager() throws SQLException, IOException { - ConnectionInfo connInfo = ConnectionInfo.create(getUrl()); - zkClient = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) - .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, - HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) - .build(), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) - ) - ) - ); - zkClient.startAndWait(); - DiscoveryService discovery = new ZKDiscoveryService(zkClient); - txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); - txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); - txService.startAndWait(); + protected static void setupTxManager() throws SQLException, IOException { + TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl()); } private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception { @@ -499,9 +447,7 @@ public abstract class BaseTest { } private static void checkTxManagerInitialized(ReadOnlyProps clientProps) throws SQLException, IOException { - if (txService == null) { - setupTxManager(); - } + setupTxManager(); } /**
