Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/129#discussion_r45366989
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
    @@ -90,44 +108,217 @@
      */
     public class MutationState implements SQLCloseable {
         private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
    -
    +    private static final TransactionCodec CODEC = new TransactionCodec();
    +    
         private PhoenixConnection connection;
         private final long maxSize;
    -    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
         private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
    +    private final List<TransactionAware> txAwares;
    +    private final TransactionContext txContext;
    +    private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
    +    
    +    private Transaction tx;
         private long sizeOffset;
         private int numRows = 0;
    +    private boolean txStarted = false;
    +    
         private final MutationMetricQueue mutationMetricQueue;
         private ReadMetricQueue readMetricQueue;
         
    -    MutationState(long maxSize, PhoenixConnection connection,
    -            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations) {
    -        this.maxSize = maxSize;
    -        this.connection = connection;
    -        this.mutations = mutations;
    -        boolean isMetricsEnabled = 
connection.isRequestLevelMetricsEnabled();
    -        this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
    -                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
    -    }
    -
         public MutationState(long maxSize, PhoenixConnection connection) {
    -        this(maxSize,connection,0);
    +        this(maxSize,connection, null);
    +    }
    +    
    +    public MutationState(MutationState mutationState) {
    +        this(mutationState.maxSize, mutationState.connection, 
mutationState.getTransaction());
         }
         
         public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset) {
    -        this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
    +        this(maxSize, connection, null, sizeOffset);
    +    }
    +    
    +    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx) {
    +        this(maxSize,connection, tx, 0);
    +    }
    +    
    +    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, long sizeOffset) {
    +           this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
 tx);
             this.sizeOffset = sizeOffset;
         }
         
    +   MutationState(long maxSize, PhoenixConnection connection,
    +                   Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations,
    +                   Transaction tx) {
    +           this.maxSize = maxSize;
    +           this.connection = connection;
    +           this.mutations = mutations;
    +           boolean isMetricsEnabled = 
connection.isRequestLevelMetricsEnabled();
    +           this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
    +                           : 
NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
    +           this.tx = tx;
    +           if (tx == null) {
    +                   this.txAwares = Collections.emptyList();
    +                   TransactionSystemClient txServiceClient = 
this.connection
    +                                   
.getQueryServices().getTransactionSystemClient();
    +                   this.txContext = new 
TransactionContext(txServiceClient);
    +           } else {
    +                   txAwares = Lists.newArrayList();
    +                   txContext = null;
    +           }
    +   }
    +
         public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
    -        this(maxSize, connection, sizeOffset);
    +        this(maxSize, connection, null, sizeOffset);
             this.mutations.put(table, mutations);
             this.numRows = mutations.size();
    +        this.tx = connection.getMutationState().getTransaction();
             throwIfTooBig();
         }
         
    +    public boolean checkpoint(MutationPlan plan) throws SQLException {
    +        Transaction currentTx = getTransaction();
    +        if (getTransaction() == null || plan.getTargetRef() == null || 
plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
    +            return false;
    +        }
    +        Set<TableRef> sources = plan.getSourceRefs();
    +        if (sources.isEmpty()) {
    +            return false;
    +        }
    +        // For a DELETE statement, we're always querying the table being 
deleted from. This isn't
    +        // a problem, but it potentially could be if there are other 
references to the same table
    +        // nested in the DELETE statement (as a sub query or join, for 
example).
    +        TableRef ignoreForExcludeCurrent = plan.getOperation() == 
Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null;
    +        boolean excludeCurrent = false;
    +        String targetPhysicalName = 
plan.getTargetRef().getTable().getPhysicalName().getString();
    +        for (TableRef source : sources) {
    +            if (source.getTable().isTransactional() && 
!source.equals(ignoreForExcludeCurrent)) {
    +                String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
    +                if (targetPhysicalName.equals(sourcePhysicalName)) {
    +                    excludeCurrent = true;
    +                    break;
    +                }
    +            }
    +        }
    +        // If we're querying the same table we're updating, we must 
exclude our writes to
    +        // it from being visible.
    +        if (excludeCurrent) {
    +            // If any source tables have uncommitted data prior to last 
checkpoint,
    +            // then we must create a new checkpoint.
    +            boolean hasUncommittedData = false;
    +            for (TableRef source : sources) {
    +                String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
    +                if (source.getTable().isTransactional() && 
uncommittedPhysicalNames.contains(sourcePhysicalName)) {
    +                    hasUncommittedData = true;
    +                    break;
    +                }
    +            }
    +            if (hasUncommittedData) {
    +                try {
    +                   if (txContext == null) {
    +                           currentTx = tx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
    +                   }  else {
    +                           txContext.checkpoint();
    +                           currentTx = tx = 
txContext.getCurrentTransaction();
    +                   }
    +                    // Since we've checkpointed, we can clear out 
uncommitted set, since a statement run afterwards
    +                    // should see all this data.
    +                    uncommittedPhysicalNames.clear();
    +                } catch (TransactionFailureException e) {
    +                    throw new SQLException(e);
    +                           } 
    +            }
    +            // Since we're querying our own table while mutating it, we 
must exclude
    +            // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
    +            // or get into an infinite loop (for UPSERT SELECT).
    +            
currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
    +            return true;
    +        }
    +        return false;
    +    }
    +    
    +    private void addTransactionParticipant(TransactionAware txAware) 
throws SQLException {
    +        if (txContext == null) {
    +            txAwares.add(txAware);
    +            assert(tx != null);
    +            txAware.startTx(tx);
    +        } else {
    +            txContext.addTransactionAware(txAware);
    +        }
    +    }
    +    
    +    // Though MutationState is not thread safe in general, this method 
should be because it may
    +    // be called by TableResultIterator in a multi-threaded manner. Since 
we do not want to expose
    +    // the Transaction outside of MutationState, this seems reasonable, as 
the member variables
    +    // would not change as these threads are running.
    +    public HTableInterface getHTable(PTable table) throws SQLException {
    +        HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
    +        Transaction currentTx;
    +        if (table.isTransactional() && (currentTx=getTransaction()) != 
null) {
    +            TransactionAwareHTable txAware = 
TransactionUtil.getTransactionAwareHTable(htable, table);
    +            // Using cloned mutationState as we may have started a new 
transaction already
    +            // if auto commit is true and we need to use the original one 
here.
    +            txAware.startTx(currentTx);
    +            htable = txAware;
    +        }
    +        return htable;
    +    }
    +    
    +    public PhoenixConnection getConnection() {
    +           return connection;
    +    }
    +    
    +    // Kept private as the Transaction may change when check pointed. 
Keeping it private ensures
    +    // no one holds on to a stale copy.
    +    private Transaction getTransaction() {
    +        return tx != null ? tx : txContext != null ? 
txContext.getCurrentTransaction() : null;
    +    }
    +    
    +    public boolean isTransactionStarted() {
    +           return getTransaction() != null;
    +    }
    +    
    +    public long getReadPointer() {
    +           Transaction tx = getTransaction();
    +           return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getReadPointer();
    +    }
    +    
    +    // For testing
    +    public long getWritePointer() {
    +        Transaction tx = getTransaction();
    +        return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getWritePointer();
    +    }
    +    
    +    // For testing
    +    public VisibilityLevel getVisibilityLevel() {
    +        Transaction tx = getTransaction();
    +        return tx == null ? null : tx.getVisibilityLevel();
    +    }
    +    
    +    public boolean startTransaction() throws SQLException {
    +        if (txContext == null) {
    +            throw new SQLException("No transaction context"); // TODO: 
error code
    --- End diff --
    
    Let's do this TODO. Just create a new SQLExceptionCode for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to