PHOENIX-3338 Move flapping test into test class marked as NotThreadSafe
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/577a6dee Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/577a6dee Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/577a6dee Branch: refs/heads/4.x-HBase-0.98 Commit: 577a6dee5846c53acb22c4b742e4b780b442df6b Parents: fc01284 Author: James Taylor <jamestay...@apache.org> Authored: Thu Sep 29 17:30:37 2016 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Mon Oct 3 09:51:06 2016 -0700 ---------------------------------------------------------------------- .../phoenix/tx/NotThreadSafeTransactionIT.java | 138 +++++++++++++++++++ .../org/apache/phoenix/tx/TransactionIT.java | 126 ----------------- 2 files changed, 138 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/577a6dee/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java index b50f424..404bb9e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java @@ -18,21 +18,38 @@ package org.apache.phoenix.tx; import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.createTransactionalTable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.TransactionAwareHTable; import org.junit.Test; /** @@ -190,4 +207,125 @@ public class NotThreadSafeTransactionIT extends ParallelStatsDisabledIT { } } + @Test + public void testExternalTxContext() throws Exception { + ResultSet rs; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + String fullTableName = generateUniqueName(); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + + TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient(); + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true"); + HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName)); + stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')"); + conn.commit(); + + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + } + + // Use HBase level Tephra APIs to start a new transaction + TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(txServiceClient, txAware); + txContext.start(); + + // Use HBase APIs to add a new row + Put put = new Put(Bytes.toBytes("z")); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b")); + txAware.put(put); + + // Use Phoenix APIs to add new row (sharing the transaction context) + pconn.setTransactionContext(txContext); + conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')"); + + // New connection should not see data as it hasn't been committed yet + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + } + + // Use new connection to create a row with a conflict + Connection connWithConflict = DriverManager.getConnection(getUrl(), props); + connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')"); + + // Existing connection should see data even though it hasn't been committed yet + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + + // Use Tephra APIs directly to finish (i.e. commit) the transaction + txContext.finish(); + + // Confirm that attempt to commit row with conflict fails + try { + connWithConflict.commit(); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode()); + } finally { + connWithConflict.close(); + } + + // New connection should now see data as it has been committed + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + } + + // Repeat the same as above, but this time abort the transaction + txContext = new TransactionContext(txServiceClient, txAware); + txContext.start(); + + // Use HBase APIs to add a new row + put = new Put(Bytes.toBytes("j")); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e")); + txAware.put(put); + + // Use Phoenix APIs to add new row (sharing the transaction context) + pconn.setTransactionContext(txContext); + conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')"); + + // Existing connection should see data even though it hasn't been committed yet + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(5,rs.getInt(1)); + + connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')"); + rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + + // Use Tephra APIs directly to abort (i.e. rollback) the transaction + txContext.abort(); + + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + + // Should succeed since conflicting row was aborted + connWithConflict.commit(); + + // New connection should now see data as it has been committed + try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { + rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + } + + // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been + // written to hide it. + Result result = htable.get(new Get(Bytes.toBytes("j"))); + assertTrue(result.isEmpty()); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/577a6dee/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 8c3ad7f..2e45d5a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -38,11 +38,9 @@ import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; @@ -59,10 +57,7 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TransactionContext; -import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; import org.junit.Ignore; import org.junit.Test; @@ -562,127 +557,6 @@ public class TransactionIT extends ParallelStatsDisabledIT { } @Test - public void testExternalTxContext() throws Exception { - ResultSet rs; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - String fullTableName = generateUniqueName(); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - - TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient(); - - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true"); - HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName)); - stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')"); - conn.commit(); - - try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { - rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - } - - // Use HBase level Tephra APIs to start a new transaction - TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW); - TransactionContext txContext = new TransactionContext(txServiceClient, txAware); - txContext.start(); - - // Use HBase APIs to add a new row - Put put = new Put(Bytes.toBytes("z")); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b")); - txAware.put(put); - - // Use Phoenix APIs to add new row (sharing the transaction context) - pconn.setTransactionContext(txContext); - conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')"); - - // New connection should not see data as it hasn't been committed yet - try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { - rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - } - - // Use new connection to create a row with a conflict - Connection connWithConflict = DriverManager.getConnection(getUrl(), props); - connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')"); - - // Existing connection should see data even though it hasn't been committed yet - rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - - // Use Tephra APIs directly to finish (i.e. commit) the transaction - txContext.finish(); - - // Confirm that attempt to commit row with conflict fails - try { - connWithConflict.commit(); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode()); - } finally { - connWithConflict.close(); - } - - // New connection should now see data as it has been committed - try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { - rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - } - - // Repeat the same as above, but this time abort the transaction - txContext = new TransactionContext(txServiceClient, txAware); - txContext.start(); - - // Use HBase APIs to add a new row - put = new Put(Bytes.toBytes("j")); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e")); - txAware.put(put); - - // Use Phoenix APIs to add new row (sharing the transaction context) - pconn.setTransactionContext(txContext); - conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')"); - - // Existing connection should see data even though it hasn't been committed yet - rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(5,rs.getInt(1)); - - connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')"); - rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - - // Use Tephra APIs directly to abort (i.e. rollback) the transaction - txContext.abort(); - - rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - - // Should succeed since conflicting row was aborted - connWithConflict.commit(); - - // New connection should now see data as it has been committed - try (Connection newConn = DriverManager.getConnection(getUrl(), props)) { - rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - } - - // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been - // written to hide it. - Result result = htable.get(new Get(Bytes.toBytes("j"))); - assertTrue(result.isEmpty()); - } - - @Test public void testCheckpointAndRollback() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props);