http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java index cbfe9a5..f807a2b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java @@ -32,10 +32,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; @@ -49,14 +46,19 @@ import org.junit.runners.Parameterized.Parameters; public class MutableRollbackIT extends ParallelStatsDisabledIT { private final boolean localIndex; + private final String tableDDLOptions; - public MutableRollbackIT(boolean localIndex) { + public MutableRollbackIT(boolean localIndex, String transactionProvider) { this.localIndex = localIndex; + this.tableDDLOptions = " TRANSACTION_PROVIDER='" + transactionProvider + "'"; } - @Parameters(name="MutableRollbackIT_localIndex={0}") // name is used by failsafe as file name in reports - public static Collection<Boolean> data() { - return Arrays.asList(new Boolean[] { false, true}); + @Parameters(name="MutableRollbackIT_localIndex={0},transactionProvider={1}") // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + { false, "TEPHRA"}, { true, "TEPHRA"}, + { false, "OMID"}, + }),1); } private static Connection getConnection() throws SQLException { @@ -77,8 +79,8 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true"); + stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true,"+tableDDLOptions); stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1) INCLUDE(v2)"); stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName2 + " ON " + fullTableName2 + " (v1) INCLUDE(v2)"); @@ -210,8 +212,8 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true"); + stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true,"+tableDDLOptions); stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1, k)"); stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName2 + " ON " + fullTableName2 + " (v1, k)"); @@ -294,11 +296,6 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { conn.rollback(); assertDataAndIndexRows(stmt, fullTableName1, fullTableName2, indexName1); - PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class); - if(localIndex) { - dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName1); - dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName2); - } } finally { conn.close(); } @@ -347,7 +344,7 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions); stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1, k)"); stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'yyyy', 'a')"); @@ -435,8 +432,6 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { assertEquals("x", rs.getString(1)); assertEquals("yyyy", rs.getString(2)); assertFalse(rs.next()); - PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class); - if(localIndex) dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName1); } finally { conn.close(); } @@ -451,7 +446,7 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions); stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1)"); stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'a', 'a')"); conn.commit(); @@ -501,19 +496,8 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT { assertEquals("x", rs.getString(1)); assertEquals("a", rs.getString(2)); assertFalse(rs.next()); - PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class); - if(localIndex) dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName1); } finally { conn.close(); } } - - private void dropTable(HBaseAdmin admin, Connection conn, String tableName) throws SQLException, IOException { - conn.createStatement().execute("DROP TABLE IF EXISTS "+ tableName); - if(admin.tableExists(tableName)) { - admin.disableTable(TableName.valueOf(tableName)); - admin.deleteTable(TableName.valueOf(tableName)); - } - } - }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java index 8cafcfc..ff9f14b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java @@ -45,11 +45,16 @@ import org.junit.runners.Parameterized.Parameters; public class RollbackIT extends ParallelStatsDisabledIT { private final boolean localIndex; - private final boolean mutable; + private final String tableDDLOptions; - public RollbackIT(boolean localIndex, boolean mutable) { + public RollbackIT(boolean localIndex, boolean mutable, String transactionProvider) { this.localIndex = localIndex; - this.mutable = mutable; + StringBuilder optionBuilder = new StringBuilder(); + optionBuilder.append(" TRANSACTION_PROVIDER='" + transactionProvider + "'"); + if (!mutable) { + optionBuilder.append(",IMMUTABLE_ROWS=true"); + } + this.tableDDLOptions = optionBuilder.toString(); } private static Connection getConnection() throws SQLException { @@ -59,12 +64,13 @@ public class RollbackIT extends ParallelStatsDisabledIT { return conn; } - @Parameters(name="RollbackIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, - { true, false }, { true, true } - }); + @Parameters(name="RollbackIT_localIndex={0},mutable={1},transactionProvider={2}") // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + { false, false, "TEPHRA" }, { false, true, "TEPHRA" }, + { true, false, "TEPHRA" }, { true, true, "TEPHRA" }, + { false, false, "OMID" }, { false, true, "OMID" }, + }),2); } @Test @@ -76,7 +82,7 @@ public class RollbackIT extends ParallelStatsDisabledIT { conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions); stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"); stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'y', 'a')"); @@ -120,7 +126,7 @@ public class RollbackIT extends ParallelStatsDisabledIT { String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+tableDDLOptions); stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(v1, k)"); stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'y', 'a')"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java index ec60151..bc4f8a3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java @@ -48,6 +48,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -68,11 +69,16 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT { private static final String ROW_TO_FAIL = "fail"; private final boolean localIndex; - private final boolean mutable; + private final String tableDDLOptions; - public TxWriteFailureIT(boolean localIndex, boolean mutable) { + public TxWriteFailureIT(boolean localIndex, boolean mutable, String transactionProvider) { this.localIndex = localIndex; - this.mutable = mutable; + StringBuilder optionBuilder = new StringBuilder(); + optionBuilder.append(" TRANSACTION_PROVIDER='" + transactionProvider + "'"); + if (!mutable) { + optionBuilder.append(",IMMUTABLE_ROWS=true"); + } + this.tableDDLOptions = optionBuilder.toString(); } @BeforeClass @@ -87,11 +93,12 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT { setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - @Parameters(name="TxWriteFailureIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } - }); + @Parameters(name="TxWriteFailureIT_localIndex={0},mutable={1},transactionProvider={2}") // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + { false, false, "TEPHRA" }, { false, true, "TEPHRA" }, { true, false, "TEPHRA" }, { true, true, "TEPHRA" }, + { false, false, "OMID" }, { false, true, "OMID" }, + }), 2); } @Before @@ -120,7 +127,7 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT { Connection conn = driver.connect(url, props); conn.setAutoCommit(false); conn.createStatement().execute( - "CREATE TABLE " + dataTableFullName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + "CREATE TABLE " + dataTableFullName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+tableDDLOptions); conn.createStatement().execute( "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + dataTableFullName + " (v1)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index feb0ce4..c6dc312 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -18,7 +18,6 @@ package org.apache.phoenix.execute; import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Sets.newHashSet; import static java.util.Collections.singletonList; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT; import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE; @@ -37,11 +36,9 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; @@ -51,7 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.BaseOwnClusterIT; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -61,9 +58,7 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SchemaUtil; -import org.junit.After; -import org.junit.AfterClass; +import org.apache.phoenix.util.TestUtil; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -72,27 +67,23 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; @RunWith(Parameterized.class) // Needs to extend BaseOwnClusterIT due to installation of FailingRegionObserver coprocessor -public class PartialCommitIT extends BaseOwnClusterIT { +public class PartialCommitIT extends BaseUniqueNamesOwnClusterIT { - private final String A_SUCESS_TABLE; - private final String B_FAILURE_TABLE; - private final String C_SUCESS_TABLE; - private final String UPSERT_TO_FAIL; - private final String UPSERT_SELECT_TO_FAIL; - private final String DELETE_TO_FAIL; + private final String aSuccessTable; + private final String bFailureTable; + private final String cSuccessTable; + private final String upsertToFail; + private final String upsertSelectToFail; + private final String deleteToFail; private static final String TABLE_NAME_TO_FAIL = "B_FAILURE_TABLE"; private static final byte[] ROW_TO_FAIL_UPSERT_BYTES = Bytes.toBytes("fail me upsert"); private static final byte[] ROW_TO_FAIL_DELETE_BYTES = Bytes.toBytes("fail me delete"); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - @Override - @After - public void cleanUpAfterTest() throws Exception {} - @BeforeClass public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3); @@ -103,144 +94,135 @@ public class PartialCommitIT extends BaseOwnClusterIT { clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true"); clientProps.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); - createTablesWithABitOfData(); - } - - @Parameters(name="PartialCommitIT_transactional={0}") // name is used by failsafe as file name in reports - public static Collection<Boolean> data() { - return Arrays.asList(false, true); } private final boolean transactional; + private final String transactionProvider; - public PartialCommitIT(boolean transactional) { - this.transactional = transactional; - if (transactional) { - A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN"; - B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN"; - C_SUCESS_TABLE = "C_SUCCESS_TABLE_TXN"; - } - else { - A_SUCESS_TABLE = "A_SUCCESS_TABLE"; - B_FAILURE_TABLE = TABLE_NAME_TO_FAIL; - C_SUCESS_TABLE = "C_SUCCESS_TABLE"; - } - UPSERT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')"; - UPSERT_SELECT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " select k, c from a_success_table"; - DELETE_TO_FAIL = "delete from " + B_FAILURE_TABLE + " where k='" + Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'"; + @Parameters(name="PartialCommitIT_transactionProvider={0}") + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + {"TEPHRA"},{"OMID"}}),0); + } + + public PartialCommitIT(String transactionProvider) { + this.transactionProvider = transactionProvider; + this.transactional = transactionProvider != null; + aSuccessTable = generateUniqueName(); + bFailureTable = TABLE_NAME_TO_FAIL + generateUniqueName(); + cSuccessTable = generateUniqueName(); + upsertToFail = "upsert into " + bFailureTable + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')"; + upsertSelectToFail = "upsert into " + bFailureTable + " select k, c from " + aSuccessTable; + deleteToFail = "delete from " + bFailureTable + " where k='" + Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'"; } - private static void createTablesWithABitOfData() throws Exception { - try (Connection con = driver.connect(url, new Properties())) { + private void createTables() throws Exception { + try (Connection con = DriverManager.getConnection(getUrl())) { Statement sta = con.createStatement(); - sta.execute("create table a_success_table (k varchar primary key, c varchar)"); - sta.execute("create table b_failure_table (k varchar primary key, c varchar)"); - sta.execute("create table c_success_table (k varchar primary key, c varchar)"); - sta.execute("create table a_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true"); - sta.execute("create table b_failure_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true"); - sta.execute("create table c_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true"); - con.commit(); + sta.execute("create table " + aSuccessTable + " (k varchar primary key, c varchar)" + (transactional ? (" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'") : "")); + sta.execute("create table " + bFailureTable + " (k varchar primary key, c varchar)" + (transactional ? (" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'") : "")); + sta.execute("create table " + cSuccessTable + " (k varchar primary key, c varchar)" + (transactional ? (" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'") : "")); } - - try (Connection con = driver.connect(url, new Properties())) { + } + + private void populateTables() throws Exception { + try (Connection con = DriverManager.getConnection(getUrl())) { con.setAutoCommit(false); Statement sta = con.createStatement(); - for (String table : newHashSet("a_success_table", "b_failure_table", "c_success_table", - "a_success_table_txn", "b_failure_table_txn", "c_success_table_txn")) { - sta.execute("upsert into " + table + " values ('z', 'z')"); - sta.execute("upsert into " + table + " values ('zz', 'zz')"); - sta.execute("upsert into " + table + " values ('zzz', 'zzz')"); + List<String> tableNames = Lists.newArrayList(aSuccessTable, bFailureTable, cSuccessTable); + for (String tableName : tableNames) { + sta.execute("upsert into " + tableName + " values ('z', 'z')"); + sta.execute("upsert into " + tableName + " values ('zz', 'zz')"); + sta.execute("upsert into " + tableName + " values ('zzz', 'zzz')"); } con.commit(); } } - @AfterClass - public static void teardownCluster() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - @Before - public void resetGlobalMetrics() { + public void resetGlobalMetrics() throws Exception { + createTables(); + populateTables(); for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) { m.reset(); } } @Test - public void testNoFailure() { - testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), + public void testNoFailure() throws SQLException { + testPartialCommit(singletonList("upsert into " + aSuccessTable + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + aSuccessTable + " where k='testNoFailure'"), singletonList(new Integer(1))); } @Test - public void testUpsertFailure() { - testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')", - UPSERT_TO_FAIL, - "upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"), + public void testUpsertFailure() throws SQLException { + testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testUpsertFailure1', 'a')", + upsertToFail, + "upsert into " + aSuccessTable + " values ('testUpsertFailure2', 'b')"), transactional ? new int[] {0,1,2} : new int[]{1}, true, - newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'", - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), + newArrayList("select count(*) from " + aSuccessTable + " where k like 'testUpsertFailure_'", + "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), transactional ? newArrayList(new Integer(0), new Integer(0)) : newArrayList(new Integer(2), new Integer(0))); } @Test public void testUpsertSelectFailure() throws SQLException { - try (Connection con = driver.connect(url, new Properties())) { - con.createStatement().execute("upsert into " + A_SUCESS_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')"); + try (Connection con = DriverManager.getConnection(getUrl())) { + con.createStatement().execute("upsert into " + aSuccessTable + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')"); con.commit(); } - testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')", - UPSERT_SELECT_TO_FAIL), + testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testUpsertSelectFailure', 'a')", + upsertSelectToFail), transactional ? new int[] {0,1} : new int[]{1}, true, - newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')", - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), + newArrayList("select count(*) from " + aSuccessTable + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')", + "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), transactional ? newArrayList(new Integer(1) /* from commit above */, new Integer(0)) : newArrayList(new Integer(2), new Integer(0))); } @Test - public void testDeleteFailure() { - testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')", - DELETE_TO_FAIL, - "upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"), + public void testDeleteFailure() throws SQLException { + testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testDeleteFailure1', 'a')", + deleteToFail, + "upsert into " + aSuccessTable + " values ('testDeleteFailure2', 'b')"), transactional ? new int[] {0,1,2} : new int[]{1}, true, - newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'", - "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), + newArrayList("select count(*) from " + aSuccessTable + " where k like 'testDeleteFailure_'", + "select count(*) from " + bFailureTable + " where k = 'z'"), transactional ? newArrayList(new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(2), new Integer(1))); } /** * {@link MutationState} keeps mutations ordered lexicographically by table name. + * @throws SQLException */ @Test - public void testOrderOfMutationsIsPredicatable() { - testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order - UPSERT_TO_FAIL, - "upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order + public void testOrderOfMutationsIsPredicatable() throws SQLException { + testPartialCommit(newArrayList("upsert into " + cSuccessTable + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order + upsertToFail, + "upsert into " + aSuccessTable + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order transactional ? new int[] {0,1,2} : new int[]{0,1}, true, - newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), + newArrayList("select count(*) from " + cSuccessTable + " where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from " + aSuccessTable + " where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), transactional ? newArrayList(new Integer(0), new Integer(0), new Integer(0)) : newArrayList(new Integer(0), new Integer(1), new Integer(0))); } @Test - public void testStatementOrderMaintainedInConnection() { - testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testStatementOrderMaintainedInConnection', 'a')", - "upsert into " + A_SUCESS_TABLE + " select k, c from " + C_SUCESS_TABLE, - DELETE_TO_FAIL, - "select * from " + A_SUCESS_TABLE + "", - UPSERT_TO_FAIL), + public void testStatementOrderMaintainedInConnection() throws SQLException { + testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testStatementOrderMaintainedInConnection', 'a')", + "upsert into " + aSuccessTable + " select k, c from " + cSuccessTable, + deleteToFail, + "select * from " + aSuccessTable + "", + upsertToFail), transactional ? new int[] {0,1,2,4} : new int[]{2,4}, true, - newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'", - "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), + newArrayList("select count(*) from " + aSuccessTable + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection + "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'", + "select count(*) from " + bFailureTable + " where k = 'z'"), transactional ? newArrayList(new Integer(3) /* original rows */, new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(4), new Integer(0), new Integer(1))); } private void testPartialCommit(List<String> statements, int[] expectedUncommittedStatementIndexes, boolean willFail, List<String> countStatementsForVerification, - List<Integer> expectedCountsForVerification) { + List<Integer> expectedCountsForVerification) throws SQLException { Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size()); try (Connection con = getConnectionWithTableOrderPreservingMutationState()) { @@ -264,7 +246,7 @@ public class PartialCommitIT extends BaseOwnClusterIT { int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes(); assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes); Map<String, Map<MetricType, Long>> mutationWriteMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(con); - assertEquals(expectedUncommittedStatementIndexes.length, mutationWriteMetrics.get(B_FAILURE_TABLE).get(MUTATION_BATCH_FAILED_SIZE).intValue()); + assertEquals(expectedUncommittedStatementIndexes.length, mutationWriteMetrics.get(bFailureTable).get(MUTATION_BATCH_FAILED_SIZE).intValue()); assertEquals(expectedUncommittedStatementIndexes.length, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum()); } @@ -278,22 +260,20 @@ public class PartialCommitIT extends BaseOwnClusterIT { } assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1)); } - } catch (SQLException e) { - fail(e.toString()); } } private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { - Connection con = driver.connect(url, new Properties()); - PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); - final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator()); - // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState - return new PhoenixConnection(phxCon, (MutationState)null) { - @Override - protected MutationState newMutationState(int maxSize, int maxSizeBytes) { - return new MutationState(maxSize, maxSizeBytes, this, mutations, false, null); + try (PhoenixConnection con = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) { + final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator()); + // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState + return new PhoenixConnection(con, (MutationState)null) { + @Override + protected MutationState newMutationState(int maxSize, int maxSizeBytes) { + return new MutationState(maxSize, maxSizeBytes, this, mutations, false, null); + }; }; - }; + } } public static class FailingRegionObserver extends SimpleRegionObserver { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java index 2211d58..0ddbed3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java @@ -19,7 +19,6 @@ package org.apache.phoenix.rpc; import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE; import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -43,6 +42,7 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -50,7 +50,7 @@ import org.junit.Test; import org.mockito.Mockito; /** - * Verifies the number of rpcs calls from {@link MetaDataClient} updateCache() + * Verifies the number of RPC calls from {@link MetaDataClient} updateCache() * for transactional and non-transactional tables. */ public class UpdateCacheIT extends ParallelStatsDisabledIT { @@ -67,10 +67,15 @@ public class UpdateCacheIT extends ParallelStatsDisabledIT { @Test public void testUpdateCacheForTxnTable() throws Exception { - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE; - Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)); - conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true"); - helpTestUpdateCache(fullTableName, new int[] {1, 1}); + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + if (provider.runTests()) { + String tableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName; + Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)); + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'"); + helpTestUpdateCache(fullTableName, new int[] {1, 1}); + } + } } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java index 6eb9411..8fd6d75 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java @@ -87,27 +87,19 @@ public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT { private final boolean mutable; private static final int defaultGuidePostWidth = 20; - protected StatsCollectorIT(boolean mutable, boolean transactional, boolean userTableNamespaceMapped, boolean columnEncoded) { + protected StatsCollectorIT(boolean mutable, String transactionProvider, boolean userTableNamespaceMapped, boolean columnEncoded) { StringBuilder sb = new StringBuilder(); - if (transactional) { - sb.append("TRANSACTIONAL=true"); - } - if (!columnEncoded) { - if (sb.length()>0) { - sb.append(","); - } - sb.append("COLUMN_ENCODED_BYTES=0"); + if (columnEncoded) { + sb.append("COLUMN_ENCODED_BYTES=4"); } else { - if (sb.length()>0) { - sb.append(","); - } - sb.append("COLUMN_ENCODED_BYTES=4"); + sb.append("COLUMN_ENCODED_BYTES=0"); + } + + if (transactionProvider != null) { + sb.append(",TRANSACTIONAL=true, TRANSACTION_PROVIDER='" + transactionProvider + "'"); } if (!mutable) { - if (sb.length()>0) { - sb.append(","); - } - sb.append("IMMUTABLE_ROWS=true"); + sb.append(",IMMUTABLE_ROWS=true"); if (!columnEncoded) { sb.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java index 0323c32..5ba8dd9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java @@ -19,7 +19,6 @@ package org.apache.phoenix.tx; import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.apache.phoenix.util.TestUtil.createTransactionalTable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -31,22 +30,29 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; import java.util.Properties; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * @@ -54,7 +60,21 @@ import org.junit.Test; * TODO: review with Tephra community * */ +@RunWith(Parameterized.class) public class FlappingTransactionIT extends ParallelStatsDisabledIT { + private final String txProvider; + + public FlappingTransactionIT(String provider) { + txProvider = provider; + } + + @Parameters(name="FlappingTransactionIT_transactionProvider={0}") // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + {"TEPHRA"}, {"OMID"} + }),0); + } + @Test public void testDelete() throws Exception { String transTableName = generateUniqueName(); @@ -63,7 +83,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl()); Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); + TestUtil.createTransactionalTable(conn, fullTableName, "TRANSACTION_PROVIDER='"+txProvider+"'"); conn1.setAutoCommit(false); ResultSet rs = conn1.createStatement().executeQuery(selectSQL); assertFalse(rs.next()); @@ -103,7 +123,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl()); Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { - createTransactionalTable(conn, fullTableName); + TestUtil.createTransactionalTable(conn, fullTableName, "TRANSACTION_PROVIDER='"+txProvider+"'"); conn1.setAutoCommit(false); conn2.setAutoCommit(true); ResultSet rs = conn1.createStatement().executeQuery(selectSQL); @@ -136,8 +156,11 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 = 1"); assertTrue(rs.next()); assertEquals(0, rs.getInt(1)); + // this succeeds rs = conn2.createStatement().executeQuery("SELECT int_col1 FROM " + fullTableName + " WHERE int_col1 = 1"); + // this fails rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1"); rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1"); - assertFalse(rs.next()); + boolean hasNext = rs.next(); + assertFalse(hasNext); conn1.commit(); @@ -158,7 +181,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl()); Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { - createTransactionalTable(conn, fullTableName); + TestUtil.createTransactionalTable(conn, fullTableName, "TRANSACTION_PROVIDER='"+txProvider+"'"); conn1.setAutoCommit(false); conn2.setAutoCommit(true); ResultSet rs = conn1.createStatement().executeQuery(selectSQL); @@ -212,7 +235,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true"); + stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true,TRANSACTION_PROVIDER='"+txProvider+"'"); HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName)); stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')"); conn.commit(); @@ -223,12 +246,14 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { assertEquals(1,rs.getInt(1)); } - PhoenixTransactionContext txContext = - TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn); - HTableInterface txTable = - txContext.getTransactionalTable(htable, false); - txContext.begin(); + PhoenixTransactionProvider provider = TransactionFactory.Provider.valueOf(txProvider).getTransactionProvider(); + PhoenixTransactionContext txContext = provider.getTransactionContext(pconn); + // FIXME: must begin txn before getting transactional table + // Either set txn on all existing OmidTransactionTable or throw exception + // when attempting to get OmidTransactionTable if a txn is not in progress. + txContext.begin(); + Table txTable = txContext.getTransactionalTable(htable, false); // Use HBase APIs to add a new row Put put = new Put(Bytes.toBytes("z")); @@ -275,12 +300,10 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { } // Repeat the same as above, but this time abort the transaction - txContext = - TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn); - txTable = - txContext.getTransactionalTable(htable, false); - + txContext = provider.getTransactionContext(pconn); txContext.begin(); + txTable = txContext.getTransactionalTable(htable, false); + // Use HBase APIs to add a new row put = new Put(Bytes.toBytes("j")); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java index ec1cf3a..f94ec17 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; @@ -54,6 +55,9 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; @@ -68,29 +72,39 @@ import com.google.common.collect.Lists; public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { private final String tableDDLOptions; + private final String tableDDLOptionsWithoutProvider; + private final PhoenixTransactionProvider transactionProvider; - public ParameterizedTransactionIT(boolean mutable, boolean columnEncoded) { + public ParameterizedTransactionIT(Boolean mutable, Boolean columnEncoded, String transactionProvider) { StringBuilder optionBuilder = new StringBuilder(); + optionBuilder.append("TRANSACTION_PROVIDER='"+transactionProvider+"',"); + this.transactionProvider = TransactionFactory.Provider.valueOf(transactionProvider).getTransactionProvider(); + StringBuilder optionBuilder2 = new StringBuilder(); if (!columnEncoded) { - optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + optionBuilder2.append("COLUMN_ENCODED_BYTES=0,"); } if (!mutable) { - if (optionBuilder.length() > 0) { - optionBuilder.append(","); - } - optionBuilder.append("IMMUTABLE_ROWS=true"); + optionBuilder2.append("IMMUTABLE_ROWS=true,"); if (!columnEncoded) { - optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + optionBuilder2.append("IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN +","); } } + if (optionBuilder2.length() > 0) { + optionBuilder2.setLength(optionBuilder2.length()-1); + optionBuilder.append(optionBuilder2); + } else { + optionBuilder.setLength(optionBuilder.length()-1); + } this.tableDDLOptions = optionBuilder.toString(); + this.tableDDLOptionsWithoutProvider = optionBuilder2.toString(); } - @Parameters(name="TransactionIT_mutable={0},columnEncoded={1}") // name is used by failsafe as file name in reports - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - {false, false }, {false, true }, {true, false }, { true, true }, - }); + @Parameters(name="ParameterizedTransactionIT_mutable={0},columnEncoded={1},transactionProvider={2}") // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + {false, false, "TEPHRA" }, {false, true, "TEPHRA" }, {true, false, "TEPHRA" }, { true, true, "TEPHRA" }, + {false, false, "OMID" }, {true, false, "OMID" }, + }), 2); } @Test @@ -134,7 +148,8 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; String selectSql = "SELECT * FROM "+fullTableName; try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions + (tableDDLOptions.length() > 0 ? "," : "") + "TRANSACTIONAL=true"); + String ddl = "create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions + (tableDDLOptions.length() > 0 ? "," : "") + "TRANSACTIONAL=true"; + conn.createStatement().execute(ddl); conn.setAutoCommit(false); ResultSet rs = conn.createStatement().executeQuery(selectSql); assertFalse(rs.next()); @@ -263,7 +278,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { String nonTxTableName = generateUniqueName(); Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptionsWithoutProvider); conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)"); conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')"); conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')"); @@ -279,7 +294,19 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { } htable.put(puts); - conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true"); + try { + conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'"); + if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) { + fail(); + } + } catch (SQLException e) { + if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode()); + return; + } else { + throw e; + } + } htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName())); @@ -328,7 +355,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { Connection conn = DriverManager.getConnection(getUrl()); // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG - conn.createStatement().execute("CREATE TABLE \"SYSTEM\"." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("CREATE TABLE \"SYSTEM\"." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptionsWithoutProvider); conn.createStatement().execute("UPSERT INTO \"SYSTEM\"." + nonTxTableName + " VALUES (1)"); conn.commit(); // Reset empty column value to an empty value like it is pre-transactions @@ -343,10 +370,14 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { // This will succeed initially in updating the HBase metadata, but then will fail when // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore // the coprocessors back to the non transactional ones. - conn.createStatement().execute("ALTER TABLE \"SYSTEM\"." + nonTxTableName + " SET TRANSACTIONAL=true"); + conn.createStatement().execute("ALTER TABLE \"SYSTEM\"." + nonTxTableName + " SET TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'"); fail(); } catch (SQLException e) { - assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); + if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode()); + } else { + assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); + } } finally { admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); admin.close(); @@ -358,7 +389,8 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName())); + Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor(); + assertFalse(htable.getTableDescriptor().getCoprocessors().contains(clazz.getName())); assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); @@ -376,7 +408,8 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { PTable table = pconn.getTable(new PTableKey(null, t1)); HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName())); + Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor(); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(clazz.getName())); try { ddl = "ALTER TABLE " + t1 + " SET transactional=false"; @@ -390,8 +423,19 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2)); desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); admin.createTable(desc); - ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); + try { + ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true,transaction_provider='" + transactionProvider + "'"; + conn.createStatement().execute(ddl); + if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) { + fail(); + } + } catch (SQLException e) { + if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode()); + return; + } + throw e; + } HTableDescriptor htableDescriptor = admin.getTableDescriptor(TableName.valueOf(t2)); String str = htableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA); @@ -405,12 +449,12 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { } catch (SQLException e) { assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); } - ddl += " transactional=true"; + ddl += " transactional=true,transaction_provider='" + transactionProvider + "'"; conn.createStatement().execute(ddl); table = pconn.getTable(new PTableKey(null, t1)); htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName())); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(clazz.getName())); } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 12c3b7a..b1866db 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -35,13 +35,22 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; +import java.util.List; import java.util.Properties; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; @@ -52,6 +61,8 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; @@ -64,6 +75,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import com.google.common.collect.Lists; + @RunWith(Parameterized.class) public class TransactionIT extends ParallelStatsDisabledIT { private final String txProvider; @@ -75,12 +88,117 @@ public class TransactionIT extends ParallelStatsDisabledIT { } @Parameters(name="TransactionIT_provider={0}") // name is used by failsafe as file name in reports - public static Collection<String[]> data() { - return Arrays.asList(new String[][] { - {"TEPHRA"/*,"OMID"*/}}); + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + {"TEPHRA"},{"OMID"}}),0); } @Test + public void testFailureToRollbackAfterDelete() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "'"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('b')"); + conn.commit(); + // Delete a in new transaction + conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k='a'"); + // Forces data to be written to HBase, but not yet committed + conn.createStatement().executeQuery("SELECT * FROM " + tableName).next(); + // Upsert another row so commit below will fail the write (and fail subsequent attempt t o abort) + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('c')"); + TestUtil.addCoprocessor(conn, tableName, WriteFailingRegionObserver.class); + try { + conn.commit(); + fail(); + } catch (SQLException e) { + } + // Delete of a shouldn't be visible since commit failed, so all rows should be present + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertFalse(rs.next()); + } + } + + public static class WriteFailingRegionObserver extends SimpleRegionObserver { + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + throw new DoNotRetryIOException(); + } + } + + @Test + public void testWithMixOfTxProviders() throws Exception { + // No sense in running the test with every providers, so just run it with the default one + if (!TransactionFactory.Provider.valueOf(txProvider).equals(TransactionFactory.Provider.getDefault())) { + return; + } + List<String> tableNames = Lists.newArrayList(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + if (provider.runTests()) { + String tableName = generateUniqueName(); + tableNames.add(tableName); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'"); + } + } + if (tableNames.size() < 2) { + return; + } + Iterator<String> iterator = tableNames.iterator(); + String tableName1 = iterator.next(); + conn.createStatement().execute("UPSERT INTO " + tableName1 + " VALUES('a')"); + String tableName2 = iterator.next(); + try { + conn.createStatement().execute("UPSERT INTO " + tableName2 + " VALUES('a')"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS.getErrorCode(), e.getErrorCode()); + } + + conn.rollback(); + conn.setAutoCommit(true); + for (String tableName : tableNames) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a')"); + } + for (String tableName : tableNames) { + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testPreventLocalIndexCreation() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + if (provider.runTests() && provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALLOW_LOCAL_INDEX)) { + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'"); + String indexName = generateUniqueName(); + try { + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE.getErrorCode(), e.getErrorCode()); + } + } + } + } + } + + @Test public void testQueryWithSCN() throws Exception { String tableName = generateUniqueName(); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -102,7 +220,7 @@ public class TransactionIT extends ParallelStatsDisabledIT { } @Test - public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { + public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws Exception { String tableName = generateUniqueName(); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -110,6 +228,10 @@ public class TransactionIT extends ParallelStatsDisabledIT { Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); stmt.execute("DROP TABLE " + tableName); + // Must drop metadata as Omid does not allow creating a transactional table from a non transactional one + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true," + tableDDLOptions); stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); @@ -210,11 +332,11 @@ public class TransactionIT extends ParallelStatsDisabledIT { try { conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "1 SET TRANSACTIONAL=true," + tableDDLOptions); - if (TransactionFactory.Provider.OMID.name().equals(txProvider)) { - fail("Omid shouldn't allow converting a non transactional table to be transactional"); + if (TransactionFactory.Provider.valueOf(txProvider).getTransactionProvider().isUnsupported(Feature.ALTER_NONTX_TO_TX)) { + fail(txProvider + " should not allow converting a non transactional table to be transactional"); } } catch (SQLException e) { // Should fail for Omid, but not Tephra - if (TransactionFactory.Provider.TEPHRA.name().equals(txProvider)) { + if (!TransactionFactory.Provider.valueOf(txProvider).getTransactionProvider().isUnsupported(Feature.ALTER_NONTX_TO_TX)) { throw e; } assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode()); @@ -247,7 +369,7 @@ public class TransactionIT extends ParallelStatsDisabledIT { } conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)"); - conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TRANSACTIONAL=true, VERSIONS=10"); + conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TRANSACTIONAL=true, VERSIONS=10, " + tableDDLOptions); desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(10, colDesc.getMaxVersions()); @@ -376,6 +498,7 @@ public class TransactionIT extends ParallelStatsDisabledIT { } } + private static void assertTTL(Admin admin, String tableName, int ttl) throws TableNotFoundException, IOException { HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName)); for (HColumnDescriptor colDesc : tableDesc.getFamilies()) { @@ -388,16 +511,39 @@ public class TransactionIT extends ParallelStatsDisabledIT { public void testSetTTL() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); TransactionFactory.Provider txProvider = TransactionFactory.Provider.valueOf(this.txProvider); - try (Connection conn = DriverManager.getConnection(getUrl(), props); Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + try (Connection conn = DriverManager.getConnection(getUrl(), props); + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { String tableName = generateUniqueName(); - conn.createStatement().execute("CREATE TABLE " + tableName + - "(K VARCHAR PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "',TTL=100"); - assertTTL(admin, tableName, 100); + try { + conn.createStatement().execute("CREATE TABLE " + tableName + + "(K VARCHAR PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "',TTL=100"); + if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) { + fail(); + } + assertTTL(admin, tableName, 100); + } catch (SQLException e) { + if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) { + assertEquals(SQLExceptionCode.TTL_UNSUPPORTED_FOR_TXN_TABLE.getErrorCode(), e.getErrorCode()); + } else { + throw e; + } + } tableName = generateUniqueName(); conn.createStatement().execute("CREATE TABLE " + tableName + "(K VARCHAR PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "'"); - conn.createStatement().execute("ALTER TABLE " + tableName + " SET TTL=" + 200); - assertTTL(admin, tableName, 200); + try { + conn.createStatement().execute("ALTER TABLE " + tableName + " SET TTL=" + 200); + if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) { + fail(); + } + assertTTL(admin, tableName, 200); + } catch (SQLException e) { + if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) { + assertEquals(SQLExceptionCode.TTL_UNSUPPORTED_FOR_TXN_TABLE.getErrorCode(), e.getErrorCode()); + } else { + throw e; + } + } } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 42e2102..f8946e5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -38,9 +38,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; -import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -52,18 +52,15 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { private final boolean localIndex; private final String tableDDLOptions; - public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) { + public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded, String transactionProvider) { StringBuilder optionBuilder = new StringBuilder(); + optionBuilder.append("TRANSACTION_PROVIDER='" + transactionProvider + "'"); this.localIndex = localIndex; if (!columnEncoded) { - if (optionBuilder.length()!=0) - optionBuilder.append(","); - optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + optionBuilder.append(",COLUMN_ENCODED_BYTES=0"); } if (!mutable) { - if (optionBuilder.length()!=0) - optionBuilder.append(","); - optionBuilder.append("IMMUTABLE_ROWS=true"); + optionBuilder.append(",IMMUTABLE_ROWS=true"); if (!columnEncoded) { optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); } @@ -81,12 +78,13 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { return conn; } - @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports - public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, - { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } - }); + @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2},transactionProvider={3}") // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { + { false, false, false, "TEPHRA" }, { false, false, true, "TEPHRA" }, { false, true, false, "TEPHRA" }, { false, true, true, "TEPHRA" }, + { true, false, false, "TEPHRA" }, { true, false, true, "TEPHRA" }, { true, true, false, "TEPHRA" }, { true, true, true, "TEPHRA" }, + { false, false, false, "OMID" }, { false, true, false, "OMID" }, + }),3); } @Test @@ -133,7 +131,6 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { } private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = getConnection(); conn.setAutoCommit(false); try { @@ -179,7 +176,15 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { assertEquals("a2", rs.getString(3)); assertFalse(rs.next()); - //assert row is delete in index table + // assert row is deleted in data table + rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k"); + assertTrue(rs.next()); + assertEquals("x2", rs.getString(1)); + assertEquals("y2", rs.getString(2)); + assertEquals("a2", rs.getString(3)); + assertFalse(rs.next()); + + // assert row is deleted in index table rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1"); assertTrue(rs.next()); assertEquals("x2", rs.getString(1)); @@ -189,7 +194,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { conn.rollback(); - //assert two rows in data table + // assert two rows in data table rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k"); assertTrue(rs.next()); assertEquals("x1", rs.getString(1)); @@ -201,7 +206,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { assertEquals("a2", rs.getString(3)); assertFalse(rs.next()); - //assert two rows in index table + // assert two rows in index table rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1"); assertTrue(rs.next()); assertEquals("x1", rs.getString(1)); // fails here @@ -266,7 +271,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { ResultSet rs; MutationState state = conn.unwrap(PhoenixConnection.class) .getMutationState(); - state.startTransaction(TransactionFactory.Provider.TEPHRA); + conn.createStatement().executeQuery("select 1 from " + fullTableName + " LIMIT 1").next(); long wp = state.getWritePointer(); conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + ""); @@ -330,7 +335,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { conn.commit(); MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); - state.startTransaction(TransactionFactory.Provider.TEPHRA); + // Start a new transaction + stmt.executeQuery("select 1 from " + fullTableName + "1 LIMIT 1").next(); long wp = state.getWritePointer(); conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1"); assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index c2dfeab..583085e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -86,6 +86,7 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -974,7 +975,9 @@ public class DeleteCompiler { private static boolean isMaintainedOnClient(PTable table) { // Test for not being local (rather than being GLOBAL) so that this doesn't fail // when tested with our projected table. - return table.getIndexType() != IndexType.LOCAL && (table.isImmutableRows() || table.isTransactional()); + return (table.getIndexType() != IndexType.LOCAL && (table.isTransactional() || table.isImmutableRows())) || + (table.getIndexType() == IndexType.LOCAL && (table.isTransactional() && + table.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER) ) ); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java index f34c5a3..ba6d10c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -89,6 +89,9 @@ public class PostLocalIndexDDLCompiler { scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); } } + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction()); + } // Go through MutationPlan abstraction so that we can create local indexes // with a connectionless connection (which makes testing easier).
