PHOENIX-2545 Abort transaction if send fails during commit
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/16e5c7bf Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/16e5c7bf Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/16e5c7bf Branch: refs/heads/4.x-HBase-1.0 Commit: 16e5c7bf02a92c466b1730b6fa36371608d44f01 Parents: 55b7ada Author: James Taylor <[email protected]> Authored: Sat Dec 26 23:47:31 2015 -0800 Committer: James Taylor <[email protected]> Committed: Wed Dec 30 17:57:41 2015 -0800 ---------------------------------------------------------------------- .../apache/phoenix/end2end/CreateTableIT.java | 28 +++ .../apache/phoenix/end2end/UpsertSelectIT.java | 2 +- .../apache/phoenix/execute/PartialCommitIT.java | 44 ++-- .../coprocessor/MetaDataEndpointImpl.java | 13 +- .../apache/phoenix/execute/MutationState.java | 216 ++++++++++++------- .../index/PhoenixTransactionalIndexer.java | 17 +- 6 files changed, 207 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java index 7c4576c..5ffc354 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.util.PhoenixRuntime; import org.junit.Test; @@ -407,4 +408,31 @@ public class CreateTableIT extends BaseClientManagedTimeIT { assertEquals(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL.getErrorCode(),sqle.getErrorCode()); } } + + @Test + public void testAlterDeletedTable() throws Exception { + String ddl = "create table T (" + + " K varchar primary key," + + " V1 varchar)"; + long ts = nextTimestamp(); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute(ddl); + conn.close(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50)); + Connection connAt50 = DriverManager.getConnection(getUrl(), props); + connAt50.createStatement().execute("DROP TABLE T"); + connAt50.close(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+20)); + Connection connAt20 = DriverManager.getConnection(getUrl(), props); + connAt20.createStatement().execute("UPDATE STATISTICS T"); // Invalidates from cache + try { + connAt20.createStatement().execute("ALTER TABLE T ADD V2 VARCHAR"); + fail(); + } catch (NewerTableAlreadyExistsException e) { + + } + connAt20.close(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index 689562a..b5252e0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -1109,7 +1109,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { } // upsert data into base table without specifying the row timestamp column PK2 - long upsertedTs = 5; + long upsertedTs = nextTimestamp(); try (Connection conn = getConnection(upsertedTs)) { // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up // creating a new row whose timestamp is the SCN of the connection. The same SCN will be used http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index 61aae62..8d7ebcb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -100,7 +100,10 @@ public class PartialCommitIT extends BaseOwnClusterIT { return Arrays.asList(false, true); } + private final boolean transactional; + public PartialCommitIT(boolean transactional) { + this.transactional = transactional; if (transactional) { A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN"; B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN"; @@ -148,8 +151,8 @@ public class PartialCommitIT extends BaseOwnClusterIT { @Test public void testNoFailure() { - testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), 0, new int[0], false, - singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1))); + testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), + singletonList(new Integer(1))); } @Test @@ -157,10 +160,10 @@ public class PartialCommitIT extends BaseOwnClusterIT { testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')", UPSERT_TO_FAIL, "upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"), - 1, new int[]{1}, true, + transactional ? new int[] {0,1,2} : new int[]{1}, true, newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'", - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), - newArrayList(new Integer(2), new Integer(0))); + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), + transactional ? newArrayList(new Integer(0), new Integer(0)) : newArrayList(new Integer(2), new Integer(0))); } @Test @@ -172,10 +175,10 @@ public class PartialCommitIT extends BaseOwnClusterIT { testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')", UPSERT_SELECT_TO_FAIL), - 1, new int[]{1}, true, + transactional ? new int[] {0,1} : new int[]{1}, true, newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')", - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), - newArrayList(new Integer(2), new Integer(0))); + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), + transactional ? newArrayList(new Integer(1) /* from commit above */, new Integer(0)) : newArrayList(new Integer(2), new Integer(0))); } @Test @@ -183,10 +186,10 @@ public class PartialCommitIT extends BaseOwnClusterIT { testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')", DELETE_TO_FAIL, "upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"), - 1, new int[]{1}, true, + transactional ? new int[] {0,1,2} : new int[]{1}, true, newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'", - "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), - newArrayList(new Integer(2), new Integer(1))); + "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), + transactional ? newArrayList(new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(2), new Integer(1))); } /** @@ -197,11 +200,11 @@ public class PartialCommitIT extends BaseOwnClusterIT { testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order UPSERT_TO_FAIL, "upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order - 2, new int[]{0,1}, true, + transactional ? new int[] {0,1,2} : new int[]{0,1}, true, newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'", "select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), - newArrayList(new Integer(0), new Integer(1), new Integer(0))); + "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), + transactional ? newArrayList(new Integer(0), new Integer(0), new Integer(0)) : newArrayList(new Integer(0), new Integer(1), new Integer(0))); } @Test @@ -211,15 +214,15 @@ public class PartialCommitIT extends BaseOwnClusterIT { DELETE_TO_FAIL, "select * from " + A_SUCESS_TABLE + "", UPSERT_TO_FAIL), - 2, new int[]{2,4}, true, + transactional ? new int[] {0,1,2,4} : new int[]{2,4}, true, newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'", - "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), - newArrayList(new Integer(4), new Integer(0), new Integer(1))); + "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), + transactional ? newArrayList(new Integer(3) /* original rows */, new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(4), new Integer(0), new Integer(1))); } - private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail, - List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) { + private void testPartialCommit(List<String> statements, int[] expectedUncommittedStatementIndexes, boolean willFail, List<String> countStatementsForVerification, + List<Integer> expectedCountsForVerification) { Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size()); try (Connection con = getConnectionWithTableOrderPreservingMutationState()) { @@ -241,7 +244,6 @@ public class PartialCommitIT extends BaseOwnClusterIT { } assertEquals(CommitException.class, sqle.getClass()); int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes(); - assertEquals(failureCount, uncommittedStatementIndexes.length); assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes); } @@ -267,7 +269,7 @@ public class PartialCommitIT extends BaseOwnClusterIT { return new PhoenixConnection(phxCon, null) { @Override protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this, mutations, null); + return new MutationState(maxSize, this, mutations, null, null); }; }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 2043c66..ea4a7e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -978,13 +978,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso scan.setFilter(new FirstKeyOnlyFilter()); scan.setRaw(true); List<Cell> results = Lists.<Cell> newArrayList(); - try (RegionScanner scanner = region.getScanner(scan);) { + try (RegionScanner scanner = region.getScanner(scan)) { scanner.next(results); } - // HBase ignores the time range on a raw scan (HBASE-7362) - if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) { - Cell kv = results.get(0); - if (kv.getTypeByte() == Type.Delete.getCode()) { + for (Cell kv : results) { + KeyValue.Type type = Type.codeToType(kv.getTypeByte()); + if (type == Type.DeleteFamily) { // Row was deleted Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PTable table = newDeletedTableMarker(kv.getTimestamp()); @@ -1621,7 +1620,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) { // if not found then call newerTableExists and add delete marker for timestamp // found - if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) { + table = buildDeletedTable(key, cacheKey, region, clientTimeStamp); + if (table != null) { + logger.info("Found newer table deleted as of " + table.getTimeStamp()); return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 52bbeb6..fa5e962 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -25,6 +25,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_ import java.io.IOException; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -70,6 +71,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.types.PLong; @@ -109,8 +111,9 @@ import co.cask.tephra.hbase10.TransactionAwareHTable; public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); private static final TransactionCodec CODEC = new TransactionCodec(); + private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0]; - private PhoenixConnection connection; + private final PhoenixConnection connection; private final long maxSize; private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; private final List<TransactionAware> txAwares; @@ -120,35 +123,39 @@ public class MutationState implements SQLCloseable { private Transaction tx; private long sizeOffset; private int numRows = 0; - private boolean txStarted = false; + private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; public MutationState(long maxSize, PhoenixConnection connection) { - this(maxSize,connection, null); + this(maxSize,connection, null, null); + } + + public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) { + this(maxSize,connection, null, txContext); } public MutationState(MutationState mutationState) { - this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction()); + this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null); } public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { - this(maxSize, connection, null, sizeOffset); + this(maxSize, connection, null, null, sizeOffset); } - private MutationState(long maxSize, PhoenixConnection connection, Transaction tx) { - this(maxSize,connection, tx, 0); + private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) { + this(maxSize,connection, tx, txContext, 0); } - private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx); + private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { + this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx, txContext); this.sizeOffset = sizeOffset; } MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, - Transaction tx) { + Transaction tx, TransactionContext txContext) { this.maxSize = maxSize; this.connection = connection; this.mutations = mutations; @@ -157,26 +164,34 @@ public class MutationState implements SQLCloseable { : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; this.tx = tx; if (tx == null) { - this.txAwares = Collections.emptyList(); - TransactionSystemClient txServiceClient = this.connection - .getQueryServices().getTransactionSystemClient(); - this.txContext = new TransactionContext(txServiceClient); + this.txAwares = Collections.emptyList(); + if (txContext == null) { + TransactionSystemClient txServiceClient = this.connection + .getQueryServices().getTransactionSystemClient(); + this.txContext = new TransactionContext(txServiceClient); + } else { + this.txContext = txContext; + } } else { // this code path is only used while running child scans, we can't pass the txContext to child scans // as it is not thread safe, so we use the tx member variable - txAwares = Lists.newArrayList(); - txContext = null; + this.txAwares = Lists.newArrayList(); + this.txContext = null; } } public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, null, sizeOffset); + this(maxSize, connection, null, null, sizeOffset); this.mutations.put(table, mutations); this.numRows = mutations.size(); this.tx = connection.getMutationState().getTransaction(); throwIfTooBig(); } + public long getMaxSize() { + return maxSize; + } + public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { Transaction currentTx = getTransaction(); if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { @@ -308,9 +323,8 @@ public class MutationState implements SQLCloseable { } try { - if (!txStarted) { + if (!isTransactionStarted()) { txContext.start(); - txStarted = true; return true; } } catch (TransactionFailureException e) { @@ -320,7 +334,7 @@ public class MutationState implements SQLCloseable { } public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null); + MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null); state.sizeOffset = 0; return state; } @@ -599,18 +613,24 @@ public class MutationState implements SQLCloseable { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); - PTable table = tableRef.getTable(); // If we're auto committing, we've already validated the schema when we got the ColumnResolver, // so no need to do it again here. if (!connection.getAutoCommit()) { + PTable table = tableRef.getTable(); MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); + PTable resolvedTable = result.getTable(); + if (resolvedTable == null) { + throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); + } + // Always update tableRef table as the one we've cached may be out of date since when we executed + // the UPSERT VALUES call and updated in the cache before this. + tableRef.setTable(resolvedTable); long timestamp = result.getMutationTime(); if (timestamp != QueryConstants.UNSET_TIMESTAMP) { serverTimeStamp = timestamp; if (result.wasUpdated()) { // TODO: use bitset? - table = result.getTable(); - PColumn[] columns = new PColumn[table.getColumns().size()]; + PColumn[] columns = new PColumn[resolvedTable.getColumns().size()]; for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { RowMutationState valueEntry = rowEntry.getValue(); if (valueEntry != null) { @@ -624,10 +644,9 @@ public class MutationState implements SQLCloseable { } for (PColumn column : columns) { if (column != null) { - table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); + resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); } } - tableRef.setTable(table); } } } @@ -731,25 +750,28 @@ public class MutationState implements SQLCloseable { sendAll = true; } + List<TableRef> txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size()); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + boolean isTransactional; while (tableRefIterator.hasNext()) { // at this point we are going through mutations for each table - TableRef tableRef = tableRefIterator.next(); + final TableRef tableRef = tableRefIterator.next(); Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef); if (valuesMap == null || valuesMap.isEmpty()) { continue; } - PTable table = tableRef.getTable(); + // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) + long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; + final PTable table = tableRef.getTable(); // Track tables to which we've sent uncommitted data - if (table.isTransactional()) { + if (isTransactional = table.isTransactional()) { + txTableRefs.add(tableRef); uncommittedPhysicalNames.add(table.getPhysicalName().getString()); } boolean isDataTable = true; - // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; table.getIndexMaintainers(indexMetaDataPtr, connection); Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); while (mutationsIterator.hasNext()) { @@ -776,7 +798,7 @@ public class MutationState implements SQLCloseable { SQLException sqlE = null; HTableInterface hTable = connection.getQueryServices().getTable(htableName); try { - if (table.isTransactional()) { + if (isTransactional) { // If we have indexes, wrap the HTable in a delegate HTable that // will attach the necessary index meta data in the event of a // rollback @@ -830,8 +852,8 @@ public class MutationState implements SQLCloseable { } e = inferredE; } - // Throw to client with both what was committed so far and what is left to be committed. - // That way, client can either undo what was done or try again with what was not done. + // Throw to client an exception that indicates the statements that + // were not committed successfully. sqlE = new CommitException(e, getUncommittedStatementIndexes()); } finally { try { @@ -862,17 +884,24 @@ public class MutationState implements SQLCloseable { if (tableRef.getTable().getType() != PTableType.INDEX) { numRows -= valuesMap.size(); } - // Remove batches as we process them + // For transactions, track the statement indexes as we send data + // over because our CommitException should include all statements + // involved in the transaction since none of them would have been + // committed in the event of a failure. + if (isTransactional) { + addUncommittedStatementIndexes(valuesMap.values()); + } + // Remove batches as we process them if (sendAll) { - tableRefIterator.remove(); // Iterating through actual map in this case + // Iterating through map key set in this case, so we cannot use + // the remove method without getting a concurrent modification + // exception. + tableRefIterator.remove(); } else { mutations.remove(tableRef); } } } - // Note that we cannot assume that *all* mutations have been sent, since we've optimized this - // now to only send the mutations for the tables we're querying, hence we've removed the - // assertions that we're here before. } public byte[] encodeTransaction() throws SQLException { @@ -930,19 +959,17 @@ public class MutationState implements SQLCloseable { return cache; } - private void clear() throws SQLException { - this.mutations.clear(); - numRows = 0; + private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) { + for (RowMutationState rowMutationState : rowMutations) { + uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, rowMutationState.getStatementIndexes()); + } } private int[] getUncommittedStatementIndexes() { - int[] result = new int[0]; - for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) { - for (RowMutationState rowMutationState : rowMutations.values()) { - result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes()); - } + for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) { + addUncommittedStatementIndexes(rowMutationMap.values()); } - return result; + return uncommittedStatementIndexes; } @Override @@ -950,65 +977,96 @@ public class MutationState implements SQLCloseable { } private void reset() { - txStarted = false; tx = null; + txAwares.clear(); uncommittedPhysicalNames.clear(); + this.mutations.clear(); + numRows = 0; + uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; } public void rollback() throws SQLException { - clear(); - txAwares.clear(); - if (txContext != null) { - try { - if (txStarted) { + try { + if (txContext != null && isTransactionStarted()) { + try { txContext.abort(); + } catch (TransactionFailureException e) { + throw TransactionUtil.getTransactionFailureException(e); } - } catch (TransactionFailureException e) { - throw new SQLException(e); // TODO: error code - } finally { - reset(); } + } finally { + reset(); } } public void commit() throws SQLException { - boolean sendMutationsFailed=false; + boolean sendSuccessful=false; + SQLException sqlE = null; try { send(); - } catch (Throwable t) { - sendMutationsFailed=true; - throw t; + sendSuccessful=true; + } catch (SQLException e) { + sqlE = e; } finally { - txAwares.clear(); - if (txContext != null) { - try { - if (txStarted && !sendMutationsFailed) { - txContext.finish(); - } - } catch (TransactionFailureException e) { + try { + if (txContext != null && isTransactionStarted()) { + TransactionFailureException txFailure = null; + boolean finishSuccessful=false; try { - txContext.abort(e); - // abort and throw the original commit failure exception - throw TransactionUtil.getTransactionFailureException(e); - } catch (TransactionFailureException e1) { - // if abort fails and throw the abort failure exception - throw TransactionUtil.getTransactionFailureException(e1); + if (sendSuccessful) { + txContext.finish(); + finishSuccessful = true; + } + } catch (TransactionFailureException e) { + txFailure = e; + SQLException nextE = TransactionUtil.getTransactionFailureException(e); + if (sqlE == null) { + sqlE = nextE; + } else { + sqlE.setNextException(nextE); + } + } finally { + // If send fails or finish fails, abort the tx + if (!finishSuccessful) { + try { + txContext.abort(txFailure); + } catch (TransactionFailureException e) { + SQLException nextE = TransactionUtil.getTransactionFailureException(e); + if (sqlE == null) { + sqlE = nextE; + } else { + sqlE.setNextException(nextE); + } + } + } } + } + } finally { + try { + reset(); } finally { - if (!sendMutationsFailed) { - reset(); - } - } + if (sqlE != null) { + throw sqlE; + } + } } } } /** + * Send to HBase any uncommitted data for transactional tables. + * @return true if any data was sent and false otherwise. + * @throws SQLException + */ + public boolean sendUncommitted() throws SQLException { + return sendUncommitted(mutations.keySet().iterator()); + } + /** * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a * query. In this way, they are visible to subsequent reads but are not actually committed until * commit is called. * @param tableRefs - * @return true if at least partially transactional and false otherwise. + * @return true if any data was sent and false otherwise. * @throws SQLException */ public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/16e5c7bf/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 3fecc18..2925b09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -71,16 +71,19 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; -import co.cask.tephra.Transaction; -import co.cask.tephra.Transaction.VisibilityLevel; -import co.cask.tephra.TxConstants; -import co.cask.tephra.hbase10.TransactionAwareHTable; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TxConstants; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase10.TransactionAwareHTable; + /** * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a @@ -196,7 +199,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // run a scan to get the current state. We'll need this to delete // the existing index rows. Transaction tx = indexMetaData.getTransaction(); - assert(tx != null); + if (tx == null) { + throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString()); + } List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); for (IndexMaintainer indexMaintainer : indexMaintainers) {
