[
https://issues.apache.org/jira/browse/PHOENIX-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013844#comment-15013844
]
ASF GitHub Bot commented on PHOENIX-1674:
-----------------------------------------
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/129#discussion_r45366989
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
@@ -90,44 +108,217 @@
*/
public class MutationState implements SQLCloseable {
private static final Logger logger =
LoggerFactory.getLogger(MutationState.class);
-
+ private static final TransactionCodec CODEC = new TransactionCodec();
+
private PhoenixConnection connection;
private final long maxSize;
- private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>>
mutations;
+ private final List<TransactionAware> txAwares;
+ private final TransactionContext txContext;
+ private final Set<String> uncommittedPhysicalNames =
Sets.newHashSetWithExpectedSize(10);
+
+ private Transaction tx;
private long sizeOffset;
private int numRows = 0;
+ private boolean txStarted = false;
+
private final MutationMetricQueue mutationMetricQueue;
private ReadMetricQueue readMetricQueue;
- MutationState(long maxSize, PhoenixConnection connection,
- Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>>
mutations) {
- this.maxSize = maxSize;
- this.connection = connection;
- this.mutations = mutations;
- boolean isMetricsEnabled =
connection.isRequestLevelMetricsEnabled();
- this.mutationMetricQueue = isMetricsEnabled ? new
MutationMetricQueue()
- : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
- }
-
public MutationState(long maxSize, PhoenixConnection connection) {
- this(maxSize,connection,0);
+ this(maxSize,connection, null);
+ }
+
+ public MutationState(MutationState mutationState) {
+ this(mutationState.maxSize, mutationState.connection,
mutationState.getTransaction());
}
public MutationState(long maxSize, PhoenixConnection connection, long
sizeOffset) {
- this(maxSize, connection, Maps.<TableRef,
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
+ this(maxSize, connection, null, sizeOffset);
+ }
+
+ private MutationState(long maxSize, PhoenixConnection connection,
Transaction tx) {
+ this(maxSize,connection, tx, 0);
+ }
+
+ private MutationState(long maxSize, PhoenixConnection connection,
Transaction tx, long sizeOffset) {
+ this(maxSize, connection, Maps.<TableRef,
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
tx);
this.sizeOffset = sizeOffset;
}
+ MutationState(long maxSize, PhoenixConnection connection,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>>
mutations,
+ Transaction tx) {
+ this.maxSize = maxSize;
+ this.connection = connection;
+ this.mutations = mutations;
+ boolean isMetricsEnabled =
connection.isRequestLevelMetricsEnabled();
+ this.mutationMetricQueue = isMetricsEnabled ? new
MutationMetricQueue()
+ :
NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
+ this.tx = tx;
+ if (tx == null) {
+ this.txAwares = Collections.emptyList();
+ TransactionSystemClient txServiceClient =
this.connection
+
.getQueryServices().getTransactionSystemClient();
+ this.txContext = new
TransactionContext(txServiceClient);
+ } else {
+ txAwares = Lists.newArrayList();
+ txContext = null;
+ }
+ }
+
public MutationState(TableRef table,
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long
maxSize, PhoenixConnection connection) {
- this(maxSize, connection, sizeOffset);
+ this(maxSize, connection, null, sizeOffset);
this.mutations.put(table, mutations);
this.numRows = mutations.size();
+ this.tx = connection.getMutationState().getTransaction();
throwIfTooBig();
}
+ public boolean checkpoint(MutationPlan plan) throws SQLException {
+ Transaction currentTx = getTransaction();
+ if (getTransaction() == null || plan.getTargetRef() == null ||
plan.getTargetRef().getTable() == null ||
!plan.getTargetRef().getTable().isTransactional()) {
+ return false;
+ }
+ Set<TableRef> sources = plan.getSourceRefs();
+ if (sources.isEmpty()) {
+ return false;
+ }
+ // For a DELETE statement, we're always querying the table being
deleted from. This isn't
+ // a problem, but it potentially could be if there are other
references to the same table
+ // nested in the DELETE statement (as a sub query or join, for
example).
+ TableRef ignoreForExcludeCurrent = plan.getOperation() ==
Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null;
+ boolean excludeCurrent = false;
+ String targetPhysicalName =
plan.getTargetRef().getTable().getPhysicalName().getString();
+ for (TableRef source : sources) {
+ if (source.getTable().isTransactional() &&
!source.equals(ignoreForExcludeCurrent)) {
+ String sourcePhysicalName =
source.getTable().getPhysicalName().getString();
+ if (targetPhysicalName.equals(sourcePhysicalName)) {
+ excludeCurrent = true;
+ break;
+ }
+ }
+ }
+ // If we're querying the same table we're updating, we must
exclude our writes to
+ // it from being visible.
+ if (excludeCurrent) {
+ // If any source tables have uncommitted data prior to last
checkpoint,
+ // then we must create a new checkpoint.
+ boolean hasUncommittedData = false;
+ for (TableRef source : sources) {
+ String sourcePhysicalName =
source.getTable().getPhysicalName().getString();
+ if (source.getTable().isTransactional() &&
uncommittedPhysicalNames.contains(sourcePhysicalName)) {
+ hasUncommittedData = true;
+ break;
+ }
+ }
+ if (hasUncommittedData) {
+ try {
+ if (txContext == null) {
+ currentTx = tx =
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+ } else {
+ txContext.checkpoint();
+ currentTx = tx =
txContext.getCurrentTransaction();
+ }
+ // Since we've checkpointed, we can clear out
uncommitted set, since a statement run afterwards
+ // should see all this data.
+ uncommittedPhysicalNames.clear();
+ } catch (TransactionFailureException e) {
+ throw new SQLException(e);
+ }
+ }
+ // Since we're querying our own table while mutating it, we
must exclude
+ // see our current mutations, otherwise we can get erroneous
results (for DELETE)
+ // or get into an infinite loop (for UPSERT SELECT).
+
currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ return true;
+ }
+ return false;
+ }
+
+ private void addTransactionParticipant(TransactionAware txAware)
throws SQLException {
+ if (txContext == null) {
+ txAwares.add(txAware);
+ assert(tx != null);
+ txAware.startTx(tx);
+ } else {
+ txContext.addTransactionAware(txAware);
+ }
+ }
+
+ // Though MutationState is not thread safe in general, this method
should be because it may
+ // be called by TableResultIterator in a multi-threaded manner. Since
we do not want to expose
+ // the Transaction outside of MutationState, this seems reasonable, as
the member variables
+ // would not change as these threads are running.
+ public HTableInterface getHTable(PTable table) throws SQLException {
+ HTableInterface htable =
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
+ Transaction currentTx;
+ if (table.isTransactional() && (currentTx=getTransaction()) !=
null) {
+ TransactionAwareHTable txAware =
TransactionUtil.getTransactionAwareHTable(htable, table);
+ // Using cloned mutationState as we may have started a new
transaction already
+ // if auto commit is true and we need to use the original one
here.
+ txAware.startTx(currentTx);
+ htable = txAware;
+ }
+ return htable;
+ }
+
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ // Kept private as the Transaction may change when check pointed.
Keeping it private ensures
+ // no one holds on to a stale copy.
+ private Transaction getTransaction() {
+ return tx != null ? tx : txContext != null ?
txContext.getCurrentTransaction() : null;
+ }
+
+ public boolean isTransactionStarted() {
+ return getTransaction() != null;
+ }
+
+ public long getReadPointer() {
+ Transaction tx = getTransaction();
+ return tx == null ? HConstants.LATEST_TIMESTAMP :
tx.getReadPointer();
+ }
+
+ // For testing
+ public long getWritePointer() {
+ Transaction tx = getTransaction();
+ return tx == null ? HConstants.LATEST_TIMESTAMP :
tx.getWritePointer();
+ }
+
+ // For testing
+ public VisibilityLevel getVisibilityLevel() {
+ Transaction tx = getTransaction();
+ return tx == null ? null : tx.getVisibilityLevel();
+ }
+
+ public boolean startTransaction() throws SQLException {
+ if (txContext == null) {
+ throw new SQLException("No transaction context"); // TODO:
error code
--- End diff --
Let's do this TODO. Just create a new SQLExceptionCode for this.
> Snapshot isolation transaction support through Tephra
> -----------------------------------------------------
>
> Key: PHOENIX-1674
> URL: https://issues.apache.org/jira/browse/PHOENIX-1674
> Project: Phoenix
> Issue Type: Improvement
> Reporter: James Taylor
> Labels: SFDC
>
> Tephra (http://tephra.io/ and https://github.com/caskdata/tephra) is one
> option for getting transaction support in Phoenix. Let's use this JIRA to
> discuss the way in which this could be integrated along with the pros and
> cons.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)