Repository: phoenix Updated Branches: refs/heads/txn d3b85048a -> 746c6d8ea
Enhance PartialCommitIT.java Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b198d6b3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b198d6b3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b198d6b3 Branch: refs/heads/txn Commit: b198d6b39fbee1065ecc682220877bf29c2c6242 Parents: d3b8504 Author: Thomas D'Silva <[email protected]> Authored: Mon Nov 16 12:57:39 2015 -0800 Committer: Thomas D'Silva <[email protected]> Committed: Tue Nov 17 13:52:51 2015 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/PartialCommitIT.java | 149 +++++++++++-------- .../apache/phoenix/execute/MutationState.java | 12 +- 2 files changed, 92 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b198d6b3/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index 11a6e67..a87761e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -22,8 +22,6 @@ package org.apache.phoenix.execute; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Sets.newHashSet; import static java.util.Collections.singletonList; -import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver; -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; @@ -39,6 +37,8 @@ import java.sql.Driver; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -57,38 +58,37 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -@Category(NeedsOwnMiniClusterTest.class) -public class PartialCommitIT { +@RunWith(Parameterized.class) +public class PartialCommitIT extends BaseTest { - private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase(); - private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me"); - private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"; - private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table"; - private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + " where k='z'"; + private final String A_SUCESS_TABLE; + private final String B_FAILURE_TABLE; + private final String C_SUCESS_TABLE; + private final String UPSERT_TO_FAIL; + private final String UPSERT_SELECT_TO_FAIL; + private final String DELETE_TO_FAIL; + private static final String TABLE_NAME_TO_FAIL = "B_FAILURE_TABLE"; + private static final byte[] ROW_TO_FAIL_UPSERT_BYTES = Bytes.toBytes("fail me upsert"); + private static final byte[] ROW_TO_FAIL_DELETE_BYTES = Bytes.toBytes("fail me delete"); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static String url; private static Driver driver; - private static final Properties props = new Properties(); - - static { - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); - } @BeforeClass public static void setupCluster() throws Exception { @@ -106,27 +106,49 @@ public class PartialCommitIT { // Must update config before starting server props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + clusterInitialized = true; + setupTxManager(); createTablesWithABitOfData(); } + @Parameters(name="transactional = {0}") + public static Collection<Boolean> data() { + return Arrays.asList(false, true); + } + + public PartialCommitIT(boolean transactional) { + if (transactional) { + A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN"; + B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN"; + C_SUCESS_TABLE = "C_SUCCESS_TABLE_TXN"; + } + else { + A_SUCESS_TABLE = "A_SUCCESS_TABLE"; + B_FAILURE_TABLE = TABLE_NAME_TO_FAIL; + C_SUCESS_TABLE = "C_SUCCESS_TABLE"; + } + UPSERT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')"; + UPSERT_SELECT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " select k, c from a_success_table"; + DELETE_TO_FAIL = "delete from " + B_FAILURE_TABLE + " where k='" + Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'"; + } + private static void createTablesWithABitOfData() throws Exception { - Properties props = new Properties(); - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); - try (Connection con = driver.connect(url, new Properties())) { Statement sta = con.createStatement(); sta.execute("create table a_success_table (k varchar primary key, c varchar)"); sta.execute("create table b_failure_table (k varchar primary key, c varchar)"); sta.execute("create table c_success_table (k varchar primary key, c varchar)"); + sta.execute("create table a_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true"); + sta.execute("create table b_failure_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true"); + sta.execute("create table c_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true"); con.commit(); } - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); - try (Connection con = driver.connect(url, new Properties())) { con.setAutoCommit(false); Statement sta = con.createStatement(); - for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) { + for (String table : newHashSet("a_success_table", "b_failure_table", "c_success_table", + "a_success_table_txn", "b_failure_table_txn", "c_success_table_txn")) { sta.execute("upsert into " + table + " values ('z', 'z')"); sta.execute("upsert into " + table + " values ('zz', 'zz')"); sta.execute("upsert into " + table + " values ('zzz', 'zzz')"); @@ -142,46 +164,44 @@ public class PartialCommitIT { @Test public void testNoFailure() { - testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false, - singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1))); + testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), 0, new int[0], false, + singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1))); } @Test public void testUpsertFailure() { - testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", + testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')", UPSERT_TO_FAIL, - "upsert into a_success_table values ('testUpsertFailure2', 'b')"), + "upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"), 1, new int[]{1}, true, - newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'", + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), newArrayList(new Integer(2), new Integer(0))); } @Test public void testUpsertSelectFailure() throws SQLException { - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); - try (Connection con = driver.connect(url, new Properties())) { - con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"); + con.createStatement().execute("upsert into " + A_SUCESS_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')"); con.commit(); } - testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", + testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')", UPSERT_SELECT_TO_FAIL), 1, new int[]{1}, true, - newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')", + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), newArrayList(new Integer(2), new Integer(0))); } @Test public void testDeleteFailure() { - testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", + testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')", DELETE_TO_FAIL, - "upsert into a_success_table values ('testDeleteFailure2', 'b')"), + "upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"), 1, new int[]{1}, true, - newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), + newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'", + "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), newArrayList(new Integer(2), new Integer(1))); } @@ -190,27 +210,27 @@ public class PartialCommitIT { */ @Test public void testOrderOfMutationsIsPredicatable() { - testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order + testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order UPSERT_TO_FAIL, - "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order + "upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order 2, new int[]{0,1}, true, - newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), newArrayList(new Integer(0), new Integer(1), new Integer(0))); } @Test - public void checkThatAllStatementTypesMaintainOrderInConnection() { - testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", - "upsert into a_success_table select k, c from c_success_table", + public void testStatementOrderMaintainedInConnection() { + testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testStatementOrderMaintainedInConnection', 'a')", + "upsert into " + A_SUCESS_TABLE + " select k, c from " + C_SUCESS_TABLE, DELETE_TO_FAIL, - "select * from a_success_table", + "select * from " + A_SUCESS_TABLE + "", UPSERT_TO_FAIL), 2, new int[]{2,4}, true, - newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), + newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'", + "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), newArrayList(new Integer(4), new Integer(0), new Integer(1))); } @@ -241,6 +261,12 @@ public class PartialCommitIT { assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes); } + ResultSet rs1 = sta.executeQuery("select * from A_SUCCESS_TABLE where k='checkThatAllStatementTypesMaintainOrderInConnection' or k like 'z%'"); + while (rs1.next()) { + System.out.println(rs1.getString(1)); + System.out.println(rs1.getString(2)); + } + // verify data in HBase for (int i = 0; i < countStatementsForVerification.size(); i++) { String countStatement = countStatementsForVerification.get(i); @@ -259,7 +285,7 @@ public class PartialCommitIT { Connection con = driver.connect(url, new Properties()); PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); - // passing null mutation state forces the connection.newMutationState() to be used to create the MutationState + // passing a null mutation staate forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(phxCon, null) { @Override protected MutationState newMutationState(int maxSize) { @@ -272,7 +298,7 @@ public class PartialCommitIT { @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, final Durability durability) throws HBaseIOException { - if (shouldFailUpsert(c, put)) { + if (shouldFail(c, put)) { // throwing anything other than instances of IOException result // in this coprocessor being unloaded // DoNotRetryIOException tells HBase not to retry this mutation @@ -284,7 +310,7 @@ public class PartialCommitIT { @Override public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException { - if (shouldFailDelete(c, delete)) { + if (shouldFail(c, delete)) { // throwing anything other than instances of IOException result // in this coprocessor being unloaded // DoNotRetryIOException tells HBase not to retry this mutation @@ -293,18 +319,13 @@ public class PartialCommitIT { } } - private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { + private boolean shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) { String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow()); + // deletes on transactional tables are converted to put, so use a single helper method + return tableName.contains(TABLE_NAME_TO_FAIL) && + (Bytes.equals(ROW_TO_FAIL_UPSERT_BYTES, m.getRow()) || Bytes.equals(ROW_TO_FAIL_DELETE_BYTES, m.getRow())); } - private boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete) { - String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - return TABLE_NAME_TO_FAIL.equals(tableName) && - // Phoenix deletes are sent as Mutations with empty values - delete.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0 && - delete.getFamilyCellMap().firstEntry().getValue().get(0).getQualifierLength() == 0; - } } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/b198d6b3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 8b5bd14..8836249 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -745,7 +745,7 @@ public class MutationState implements SQLCloseable { while (mutationsIterator.hasNext()) { Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); byte[] htableName = pair.getFirst(); - List<Mutation> mutations = pair.getSecond(); + List<Mutation> mutationList = pair.getSecond(); //create a span per target table //TODO maybe we can be smarter about the table name to string here? @@ -756,7 +756,7 @@ public class MutationState implements SQLCloseable { do { ServerCache cache = null; if (isDataTable) { - cache = setMetaDataOnMutations(tableRef, mutations, indexMetaDataPtr); + cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr); } // If we haven't retried yet, retry for this case only, as it's possible that @@ -785,19 +785,19 @@ public class MutationState implements SQLCloseable { } hTable = txnAware; } - long numMutations = mutations.size(); + long numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); long startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount);; - hTable.batch(mutations); + hTable.batch(mutationList); child.stop(); child.stop(); shouldRetry = false; long mutationCommitTime = System.currentTimeMillis() - startTime; GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); - long mutationSizeBytes = calculateMutationSize(mutations); + long mutationSizeBytes = calculateMutationSize(mutationList); MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); } catch (Exception e) { @@ -840,6 +840,8 @@ public class MutationState implements SQLCloseable { } } if (sqlE != null) { + // clear pending mutations + mutations.clear(); throw sqlE; } }
