Repository: phoenix Updated Branches: refs/heads/txn 324b566f4 -> 1baa1b6b0
Transactions over mutable indexes mostly working Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1baa1b6b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1baa1b6b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1baa1b6b Branch: refs/heads/txn Commit: 1baa1b6b0c85ed06d4892648975d7e51998d75a3 Parents: 324b566 Author: James Taylor <[email protected]> Authored: Fri Apr 17 01:28:12 2015 -0700 Committer: James Taylor <[email protected]> Committed: Fri Apr 17 01:28:12 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/compile/DeleteCompiler.java | 2 +- .../apache/phoenix/compile/UpsertCompiler.java | 4 +- .../apache/phoenix/execute/BaseQueryPlan.java | 2 +- .../apache/phoenix/execute/MutationState.java | 140 +++++++++++++++++-- .../apache/phoenix/index/PhoenixIndexCodec.java | 8 ++ .../index/PhoenixTransactionalIndexer.java | 68 ++++----- .../phoenix/iterate/TableResultIterator.java | 8 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 90 +++--------- .../apache/phoenix/jdbc/PhoenixStatement.java | 6 +- .../query/ConnectionQueryServicesImpl.java | 3 +- 10 files changed, 199 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index a0369d5..0778f75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -494,7 +494,7 @@ public class DeleteCompiler { ImmutableBytesWritable ptr = context.getTempPtr(); PTable table = tableRef.getTable(); table.getIndexMaintainers(ptr, context.getConnection()); - byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; + byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getMutationState().getTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; ServerCache cache = null; try { if (ptr.getLength() > 0) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index e72b634..67d289e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -163,7 +163,7 @@ public class UpsertCompiler { if (isAutoCommit && rowCount % batchSize == 0) { MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection); connection.getMutationState().join(state); - connection.commit(); + connection.getMutationState().send(); mutation.clear(); } } @@ -610,7 +610,7 @@ public class UpsertCompiler { ImmutableBytesWritable ptr = context.getTempPtr(); PTable table = tableRef.getTable(); table.getIndexMaintainers(ptr, context.getConnection()); - byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; + byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getMutationState().getTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; ServerCache cache = null; try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 387f23d..0a3035c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -269,7 +269,7 @@ public abstract class BaseQueryPlan implements QueryPlan { scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); if (dataTable.isTransactional()) { PhoenixConnection conn = context.getConnection(); - scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getTransactionContext().getCurrentTransaction())); + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getMutationState().getTransaction())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 59bc6ce..a0cf8d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -28,6 +28,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import co.cask.tephra.Transaction; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionSystemClient; import co.cask.tephra.hbase98.TransactionAwareHTable; import org.apache.hadoop.hbase.HConstants; @@ -88,17 +93,39 @@ public class MutationState implements SQLCloseable { // rows - map from rowkey to columns // columns - map from column to value private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? + private final Transaction tx; + private final List<TransactionAware> txAwares; + private final TransactionContext txContext; + private long sizeOffset; private int numRows = 0; + private boolean txStarted = false; public MutationState(int maxSize, PhoenixConnection connection) { - this(maxSize,connection,0); + this(maxSize,connection,null); + } + + public MutationState(int maxSize, PhoenixConnection connection, Transaction tx) { + this(maxSize,connection, tx, 0); } public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) { + this(maxSize, connection, null, sizeOffset); + } + + public MutationState(int maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) { this.maxSize = maxSize; this.connection = connection; this.sizeOffset = sizeOffset; + this.tx = tx; + if (tx == null) { + this.txAwares = Collections.emptyList(); + TransactionSystemClient txServiceClient = this.connection.getQueryServices().getTransactionSystemClient(); + this.txContext = new TransactionContext(txServiceClient); + } else { + txAwares = Lists.newArrayList(); + txContext = null; + } } public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { @@ -107,13 +134,19 @@ public class MutationState implements SQLCloseable { this.mutations.put(table, mutations); this.sizeOffset = sizeOffset; this.numRows = mutations.size(); + this.txAwares = Lists.newArrayList(); + this.txContext = null; + this.tx = connection.getMutationState().getTransaction(); throwIfTooBig(); } - private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) { - this.maxSize = maxSize; - this.connection = connection; - this.sizeOffset = sizeOffset; + private MutationState(MutationState state, List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries) { + this.maxSize = state.maxSize; + this.connection = state.connection; + this.sizeOffset = state.sizeOffset; + this.tx = state.tx; + this.txAwares = state.txAwares; + this.txContext = state.txContext; for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) { numRows += entry.getValue().size(); this.mutations.put(entry.getKey(), entry.getValue()); @@ -121,6 +154,35 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } + private void addTxParticipant(TransactionAware txAware) throws SQLException { + if (txContext == null) { + txAwares.add(txAware); + assert(tx != null); + txAware.startTx(tx); + } else { + txContext.addTransactionAware(txAware); + } + } + + public Transaction getTransaction() { + return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; + } + + public void startTransaction() throws SQLException { + if (txContext == null) { + throw new SQLException("No transaction context"); // TODO: error code + } + + try { + if (!txStarted) { + txContext.start(); + txStarted = true; + } + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } + } + private void throwIfTooBig() { if (numRows > maxSize) { // TODO: throw SQLException ? @@ -141,6 +203,15 @@ public class MutationState implements SQLCloseable { if (this == newMutation) { // Doesn't make sense return; } + // TODO: what if new and old have txContext as that's really an error + // Really it's an error if newMutation txContext is not null + if (txContext != null) { + for (TransactionAware txAware : txAwares) { + txContext.addTransactionAware(txAware); + } + } else { + txAwares.addAll(newMutation.txAwares); + } this.sizeOffset += newMutation.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) { @@ -394,7 +465,10 @@ public class MutationState implements SQLCloseable { if (hasIndexMaintainers && isDataTable) { byte[] attribValue = null; byte[] uuidValue; - byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; + byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; + if (table.isTransactional()) { + txState = TransactionUtil.encodeTxnState(getTransaction()); + } if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) { IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); cache = client.addIndexMetadataCache(mutations, tempPtr, txState); @@ -421,15 +495,17 @@ public class MutationState implements SQLCloseable { } } } - + SQLException sqlE = null; HTableInterface hTable = connection.getQueryServices().getTable(htableName); try { - // Don't add immutable indexes (those are the only ones that would participate - // during a commit), as we don't need conflict detection for these. - if (table.isTransactional() && isDataTable) { + if (table.isTransactional()) { TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable); - connection.addTxParticipant(txnAware); + // Don't add immutable indexes (those are the only ones that would participate + // during a commit), as we don't need conflict detection for these. + if (isDataTable) { + addTxParticipant(txnAware); + } hTable = txnAware; } logMutationSize(hTable, mutations, connection); @@ -465,7 +541,7 @@ public class MutationState implements SQLCloseable { } // Throw to client with both what was committed so far and what is left to be committed. // That way, client can either undo what was done or try again with what was not done. - sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection)); + sqlE = new CommitException(e, this, new MutationState(this, committedList)); } finally { try { if (cache != null) { @@ -508,4 +584,44 @@ public class MutationState implements SQLCloseable { @Override public void close() throws SQLException { } + + public void rollback() throws SQLException { + clear(); + txAwares.clear(); + if (txContext != null) { + try { + if (txStarted) { + txContext.abort(); + } + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } finally { + txStarted = false; + } + } + } + + public void commit() throws SQLException { + try { + send(); + } finally { + txAwares.clear(); + if (txContext != null) { + try { + if (txStarted) { + txContext.finish(); + } + } catch (TransactionFailureException e) { + try { + txContext.abort(e); + throw TransactionUtil.getSQLException(e); + } catch (TransactionFailureException e1) { + throw TransactionUtil.getSQLException(e); + } + } finally { + txStarted = false; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 1fe9931..36b849d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -87,6 +87,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec { indexUpdate.setTable(maintainer.getIndexTableName()); Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env .getRegion().getStartKey(), env.getRegion().getEndKey()); + if (put == null) { + throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString() + + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength())); + } indexUpdate.setUpdate(put); indexUpdates.add(indexUpdate); } @@ -112,6 +116,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec { indexUpdate.setTable(maintainer.getIndexTableName()); Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey()); + if (delete == null) { + throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString() + + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength())); + } indexUpdate.setUpdate(delete); indexUpdates.add(indexUpdate); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index f6e6806..adba507 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -187,13 +187,13 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { if (scanner != null) { Result result; while ((result = scanner.next()) != null) { - TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result); + Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); + TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m, result); Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); for (IndexUpdate delete : deletes) { indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); } - Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow())); - state.applyMutation(m); + state.applyMutation(); Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData); for (IndexUpdate update : updates) { indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); @@ -202,7 +202,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } for (Mutation m : mutations.values()) { TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m); - state.applyMutation(m); + state.applyMutation(); Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData); for (IndexUpdate update : updates) { indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); @@ -217,7 +217,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private static class TxTableState implements TableState { - private final byte[] rowKey; + private final Mutation mutation; private final long currentTimestamp; private final RegionCoprocessorEnvironment env; private final Map<String, byte[]> attributes; @@ -225,24 +225,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private final Set<ColumnReference> indexedColumns; private final Map<ColumnReference, ImmutableBytesWritable> valueMap; - private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, byte[] rowKey) { + private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) { this.env = env; this.currentTimestamp = currentTimestamp; this.indexedColumns = indexedColumns; this.attributes = attributes; - this.rowKey = rowKey; + this.mutation = mutation; int estimatedSize = indexedColumns.size(); this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); + try { + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + pendingUpdates.add(cell); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } } - public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m) { - this(env, indexedColumns, attributes, currentTimestamp, m.getRow()); - applyMutation(m); - } - - public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Result r) { - this(env, indexedColumns, attributes, currentTimestamp, r.getRow()); + public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, Result r) { + this(env, indexedColumns, attributes, currentTimestamp, m); for (ColumnReference ref : indexedColumns) { Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); @@ -271,7 +275,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { @Override public byte[] getCurrentRowKey() { - return rowKey; + return mutation.getRow(); } @Override @@ -279,32 +283,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return Collections.emptyList(); } - public void applyMutation(Mutation m) { - if (m instanceof Delete) { + public void applyMutation() { + if (mutation instanceof Delete) { valueMap.clear(); } else { - CellScanner scanner = m.cellScanner(); - try { - while (scanner.advance()) { - Cell cell = scanner.current(); - if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { - ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - valueMap.remove(ref); - } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { - for (ColumnReference ref : indexedColumns) { - if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { - valueMap.remove(ref); - } + for (Cell cell : pendingUpdates) { + if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + valueMap.remove(ref); + } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { + for (ColumnReference ref : indexedColumns) { + if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { + valueMap.remove(ref); } - } else { - ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + } + } else { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (indexedColumns.contains(ref)) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); valueMap.put(ref, ptr); } } - } catch (IOException e) { - throw new RuntimeException(e); // Impossible } } } @@ -328,7 +328,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { @Override public byte[] getRowKey() { - return rowKey; + return mutation.getRow(); } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 7f5d527..9cece1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import co.cask.tephra.Transaction; import co.cask.tephra.hbase98.TransactionAwareHTable; import org.apache.hadoop.hbase.client.HTableInterface; @@ -87,9 +88,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator PTable table = tableRef.getTable(); HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); if (table.isTransactional()) { - TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(htable); - context.getConnection().addTxParticipant(txnAware); - htable = txnAware; + TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable); + Transaction tx = context.getConnection().getMutationState().getTransaction(); + txAware.startTx(tx); + htable = txAware; } this.htable = htable; if (creationMode == ScannerCreation.IMMEDIATE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 240a599..d513362 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -54,10 +54,7 @@ import java.util.concurrent.Executor; import javax.annotation.Nullable; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.Transaction; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.call.CallRunner; @@ -97,7 +94,6 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; -import org.apache.phoenix.util.TransactionUtil; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.TraceScope; @@ -139,7 +135,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private final String timestampPattern; private TraceScope traceScope = null; - private TransactionContext txContext; private boolean isClosed = false; private Sampler<?> sampler; private boolean readOnly = false; @@ -156,23 +151,22 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } public PhoenixConnection(PhoenixConnection connection) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache()); + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), connection.getMutationState().getTransaction()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; } - public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { - this(connection.getQueryServices(), connection, scn); - this.sampler = connection.sampler; - } - public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache()); + this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), connection.getMutationState().getTransaction()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { + this(services, url, info, metaData, null); + } + + public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, Transaction txn) throws SQLException { this.url = url; // Copy so client cannot change this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info); @@ -242,7 +236,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } }); - this.mutationState = new MutationState(maxSize, this); + this.mutationState = new MutationState(maxSize, this, txn); this.services.addConnection(this); // setup tracing, if its enabled @@ -250,10 +244,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd this.customTracingAnnotations = getImmutableCustomTracingAnnotations(); } - public TransactionContext getTransactionContext() { - return txContext; - } - private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() { Builder<String, String> result = ImmutableMap.builder(); result.putAll(JDBCUtil.getAnnotations(url, info)); @@ -437,60 +427,15 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } } - public void startTransaction() throws SQLException { - if (txContext == null) { - boolean success = false; - try { - TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient(); - this.txContext = new TransactionContext(txServiceClient); - txContext.start(); - success = true; - } catch (TransactionFailureException e) { - throw new SQLException(e); // TODO: error code - } finally { - if (!success) endTransaction(); - } - } - } - - public void addTxParticipant(TransactionAware txnAware) throws SQLException { - if (!isTransactionStarted()) { - startTransaction(); - } - txContext.addTransactionAware(txnAware); - } - - private boolean isTransactionStarted() { - return txContext != null; - } - - private void endTransaction() { - txContext = null; - } - @Override public void commit() throws SQLException { CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() { @Override public Void call() throws SQLException { - mutationState.send(); - if (isTransactionStarted()) { - try { - txContext.finish(); - } catch (TransactionFailureException e) { - try { - txContext.abort(e); - throw TransactionUtil.getSQLException(e); - } catch (TransactionFailureException e1) { - throw TransactionUtil.getSQLException(e); - } - } finally { - endTransaction(); - } - } + mutationState.commit(); return null; } - }, Tracing.withTracing(this, "sending mutations")); + }, Tracing.withTracing(this, "committing")); } @Override @@ -690,16 +635,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd @Override public void rollback() throws SQLException { - mutationState.clear(); - if (isTransactionStarted()) { - try { - txContext.abort(); - } catch (TransactionFailureException e) { - throw new SQLException(e); // TODO: error code - } finally { - endTransaction(); + CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() { + @Override + public Void call() throws SQLException { + mutationState.rollback(); + return null; } - } + }, Tracing.withTracing(this, "rolling back")); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 3ccc772..fb295d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -234,9 +234,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho final long startTime = System.currentTimeMillis(); try { QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); + startTransaction(plan); plan = connection.getQueryServices().getOptimizer().optimize( PhoenixStatement.this, plan); - startTransaction(plan); // this will create its own trace internally, so we don't wrap this // whole thing in tracing ResultIterator resultIterator = plan.iterator(); @@ -279,10 +279,10 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } } - private void startTransaction(StatementPlan plan) throws SQLException { + public void startTransaction(StatementPlan plan) throws SQLException { for (TableRef ref : plan.getContext().getResolver().getTables()) { if (ref.getTable().isTransactional()) { - connection.startTransaction(); + connection.getMutationState().startTransaction(); break; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1baa1b6b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 37a28fb..01cf8b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -137,7 +137,6 @@ import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableProperty; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.PTableStats; import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.schema.types.PBoolean; @@ -2070,7 +2069,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public MutationState updateData(MutationPlan plan) throws SQLException { - TableRef currentTable = plan.getContext().getCurrentTable(); + plan.getContext().getStatement().startTransaction(plan); return plan.execute(); }
