[ 
https://issues.apache.org/jira/browse/PHOENIX-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013844#comment-15013844
 ] 

ASF GitHub Bot commented on PHOENIX-1674:
-----------------------------------------

Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/129#discussion_r45366989
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
    @@ -90,44 +108,217 @@
      */
     public class MutationState implements SQLCloseable {
         private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
    -
    +    private static final TransactionCodec CODEC = new TransactionCodec();
    +    
         private PhoenixConnection connection;
         private final long maxSize;
    -    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
         private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
    +    private final List<TransactionAware> txAwares;
    +    private final TransactionContext txContext;
    +    private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
    +    
    +    private Transaction tx;
         private long sizeOffset;
         private int numRows = 0;
    +    private boolean txStarted = false;
    +    
         private final MutationMetricQueue mutationMetricQueue;
         private ReadMetricQueue readMetricQueue;
         
    -    MutationState(long maxSize, PhoenixConnection connection,
    -            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations) {
    -        this.maxSize = maxSize;
    -        this.connection = connection;
    -        this.mutations = mutations;
    -        boolean isMetricsEnabled = 
connection.isRequestLevelMetricsEnabled();
    -        this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
    -                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
    -    }
    -
         public MutationState(long maxSize, PhoenixConnection connection) {
    -        this(maxSize,connection,0);
    +        this(maxSize,connection, null);
    +    }
    +    
    +    public MutationState(MutationState mutationState) {
    +        this(mutationState.maxSize, mutationState.connection, 
mutationState.getTransaction());
         }
         
         public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset) {
    -        this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
    +        this(maxSize, connection, null, sizeOffset);
    +    }
    +    
    +    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx) {
    +        this(maxSize,connection, tx, 0);
    +    }
    +    
    +    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, long sizeOffset) {
    +           this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
 tx);
             this.sizeOffset = sizeOffset;
         }
         
    +   MutationState(long maxSize, PhoenixConnection connection,
    +                   Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations,
    +                   Transaction tx) {
    +           this.maxSize = maxSize;
    +           this.connection = connection;
    +           this.mutations = mutations;
    +           boolean isMetricsEnabled = 
connection.isRequestLevelMetricsEnabled();
    +           this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
    +                           : 
NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
    +           this.tx = tx;
    +           if (tx == null) {
    +                   this.txAwares = Collections.emptyList();
    +                   TransactionSystemClient txServiceClient = 
this.connection
    +                                   
.getQueryServices().getTransactionSystemClient();
    +                   this.txContext = new 
TransactionContext(txServiceClient);
    +           } else {
    +                   txAwares = Lists.newArrayList();
    +                   txContext = null;
    +           }
    +   }
    +
         public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
    -        this(maxSize, connection, sizeOffset);
    +        this(maxSize, connection, null, sizeOffset);
             this.mutations.put(table, mutations);
             this.numRows = mutations.size();
    +        this.tx = connection.getMutationState().getTransaction();
             throwIfTooBig();
         }
         
    +    public boolean checkpoint(MutationPlan plan) throws SQLException {
    +        Transaction currentTx = getTransaction();
    +        if (getTransaction() == null || plan.getTargetRef() == null || 
plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
    +            return false;
    +        }
    +        Set<TableRef> sources = plan.getSourceRefs();
    +        if (sources.isEmpty()) {
    +            return false;
    +        }
    +        // For a DELETE statement, we're always querying the table being 
deleted from. This isn't
    +        // a problem, but it potentially could be if there are other 
references to the same table
    +        // nested in the DELETE statement (as a sub query or join, for 
example).
    +        TableRef ignoreForExcludeCurrent = plan.getOperation() == 
Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null;
    +        boolean excludeCurrent = false;
    +        String targetPhysicalName = 
plan.getTargetRef().getTable().getPhysicalName().getString();
    +        for (TableRef source : sources) {
    +            if (source.getTable().isTransactional() && 
!source.equals(ignoreForExcludeCurrent)) {
    +                String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
    +                if (targetPhysicalName.equals(sourcePhysicalName)) {
    +                    excludeCurrent = true;
    +                    break;
    +                }
    +            }
    +        }
    +        // If we're querying the same table we're updating, we must 
exclude our writes to
    +        // it from being visible.
    +        if (excludeCurrent) {
    +            // If any source tables have uncommitted data prior to last 
checkpoint,
    +            // then we must create a new checkpoint.
    +            boolean hasUncommittedData = false;
    +            for (TableRef source : sources) {
    +                String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
    +                if (source.getTable().isTransactional() && 
uncommittedPhysicalNames.contains(sourcePhysicalName)) {
    +                    hasUncommittedData = true;
    +                    break;
    +                }
    +            }
    +            if (hasUncommittedData) {
    +                try {
    +                   if (txContext == null) {
    +                           currentTx = tx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
    +                   }  else {
    +                           txContext.checkpoint();
    +                           currentTx = tx = 
txContext.getCurrentTransaction();
    +                   }
    +                    // Since we've checkpointed, we can clear out 
uncommitted set, since a statement run afterwards
    +                    // should see all this data.
    +                    uncommittedPhysicalNames.clear();
    +                } catch (TransactionFailureException e) {
    +                    throw new SQLException(e);
    +                           } 
    +            }
    +            // Since we're querying our own table while mutating it, we 
must exclude
    +            // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
    +            // or get into an infinite loop (for UPSERT SELECT).
    +            
currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
    +            return true;
    +        }
    +        return false;
    +    }
    +    
    +    private void addTransactionParticipant(TransactionAware txAware) 
throws SQLException {
    +        if (txContext == null) {
    +            txAwares.add(txAware);
    +            assert(tx != null);
    +            txAware.startTx(tx);
    +        } else {
    +            txContext.addTransactionAware(txAware);
    +        }
    +    }
    +    
    +    // Though MutationState is not thread safe in general, this method 
should be because it may
    +    // be called by TableResultIterator in a multi-threaded manner. Since 
we do not want to expose
    +    // the Transaction outside of MutationState, this seems reasonable, as 
the member variables
    +    // would not change as these threads are running.
    +    public HTableInterface getHTable(PTable table) throws SQLException {
    +        HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
    +        Transaction currentTx;
    +        if (table.isTransactional() && (currentTx=getTransaction()) != 
null) {
    +            TransactionAwareHTable txAware = 
TransactionUtil.getTransactionAwareHTable(htable, table);
    +            // Using cloned mutationState as we may have started a new 
transaction already
    +            // if auto commit is true and we need to use the original one 
here.
    +            txAware.startTx(currentTx);
    +            htable = txAware;
    +        }
    +        return htable;
    +    }
    +    
    +    public PhoenixConnection getConnection() {
    +           return connection;
    +    }
    +    
    +    // Kept private as the Transaction may change when check pointed. 
Keeping it private ensures
    +    // no one holds on to a stale copy.
    +    private Transaction getTransaction() {
    +        return tx != null ? tx : txContext != null ? 
txContext.getCurrentTransaction() : null;
    +    }
    +    
    +    public boolean isTransactionStarted() {
    +           return getTransaction() != null;
    +    }
    +    
    +    public long getReadPointer() {
    +           Transaction tx = getTransaction();
    +           return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getReadPointer();
    +    }
    +    
    +    // For testing
    +    public long getWritePointer() {
    +        Transaction tx = getTransaction();
    +        return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getWritePointer();
    +    }
    +    
    +    // For testing
    +    public VisibilityLevel getVisibilityLevel() {
    +        Transaction tx = getTransaction();
    +        return tx == null ? null : tx.getVisibilityLevel();
    +    }
    +    
    +    public boolean startTransaction() throws SQLException {
    +        if (txContext == null) {
    +            throw new SQLException("No transaction context"); // TODO: 
error code
    --- End diff --
    
    Let's do this TODO. Just create a new SQLExceptionCode for this.


> Snapshot isolation transaction support through Tephra
> -----------------------------------------------------
>
>                 Key: PHOENIX-1674
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-1674
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: James Taylor
>              Labels: SFDC
>
> Tephra (http://tephra.io/ and https://github.com/caskdata/tephra) is one 
> option for getting transaction support in Phoenix. Let's use this JIRA to 
> discuss the way in which this could be integrated along with the pros and 
> cons.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to