[ 
https://issues.apache.org/jira/browse/PHOENIX-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15013017#comment-15013017
 ] 

ASF GitHub Bot commented on PHOENIX-1674:
-----------------------------------------

Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/129#discussion_r45307398
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
    @@ -410,149 +642,293 @@ private static long 
calculateMutationSize(List<Mutation> mutations) {
             return byteSize;
         }
         
    +    private boolean hasKeyValueColumn(PTable table, PTable index) {
    +        IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
    +        return !maintainer.getAllColumns().isEmpty();
    +    }
    +    
    +    private void divideImmutableIndexes(Iterator<PTable> 
enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> 
keyValueIndexes) {
    +        while (enabledImmutableIndexes.hasNext()) {
    +            PTable index = enabledImmutableIndexes.next();
    +            if (index.getIndexType() != IndexType.LOCAL) {
    +                if (hasKeyValueColumn(table, index)) {
    +                    keyValueIndexes.add(index);
    +                } else {
    +                    rowKeyIndexes.add(index);
    +                }
    +            }
    +        }
    +    }
    +    private class MetaDataAwareHTable extends DelegateHTableInterface {
    +        private final TableRef tableRef;
    +        
    +        private MetaDataAwareHTable(HTableInterface delegate, TableRef 
tableRef) {
    +            super(delegate);
    +            this.tableRef = tableRef;
    +        }
    +        
    +        /**
    +         * Called by Tephra when a transaction is aborted. We have this 
wrapper so that we get an
    +         * opportunity to attach our index meta data to the mutations such 
that we can also undo
    +         * the index mutations.
    +         */
    +        @Override
    +        public void delete(List<Delete> deletes) throws IOException {
    +            try {
    +                PTable table = tableRef.getTable();
    +                List<PTable> indexes = table.getIndexes();
    +                Iterator<PTable> enabledIndexes = 
IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
    +                if (enabledIndexes.hasNext()) {
    +                    List<PTable> keyValueIndexes = Collections.emptyList();
    +                    ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
    +                    boolean attachMetaData = 
table.getIndexMaintainers(indexMetaDataPtr, connection);
    +                    if (table.isImmutableRows()) {
    +                        List<PTable> rowKeyIndexes = 
Lists.newArrayListWithExpectedSize(indexes.size());
    +                        keyValueIndexes = 
Lists.newArrayListWithExpectedSize(indexes.size());
    +                        divideImmutableIndexes(enabledIndexes, table, 
rowKeyIndexes, keyValueIndexes);
    +                        // Generate index deletes for immutable indexes 
that only reference row key
    +                        // columns and submit directly here.
    +                        ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
    +                        for (PTable index : rowKeyIndexes) {
    +                            List<Delete> indexDeletes = 
IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, 
connection.getKeyValueBuilder(), connection);
    +                            HTableInterface hindex = 
connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
    +                            hindex.delete(indexDeletes);
    +                        }
    +                    }
    +                    
    +                    // If we have mutable indexes, local immutable 
indexes, or global immutable indexes
    +                    // that reference key value columns, setup index meta 
data and attach here. In this
    +                    // case updates to the indexes will be generated on 
the server side.
    +                    // An alternative would be to let Tephra track the row 
keys for the immutable index
    +                    // by adding it as a transaction participant (soon we 
can prevent any conflict
    +                    // detection from occurring) with the downside being 
the additional memory required.
    +                    if (!keyValueIndexes.isEmpty()) {
    +                        attachMetaData = true;
    +                        IndexMaintainer.serializeAdditional(table, 
indexMetaDataPtr, keyValueIndexes, connection);
    +                    }
    +                    if (attachMetaData) {
    +                        setMetaDataOnMutations(tableRef, deletes, 
indexMetaDataPtr);
    +                    }
    +                }
    +                delegate.delete(deletes);
    +            } catch (SQLException e) {
    +                throw new IOException(e);
    +            }
    +        }
    +    }
    +    
         @SuppressWarnings("deprecation")
    -    public void commit() throws SQLException {
    +    private void send(Iterator<TableRef> tableRefIterator) throws 
SQLException {
             int i = 0;
    -        PName tenantId = connection.getTenantId();
    -        long[] serverTimeStamps = validate();
    -        Iterator<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>> iterator = 
this.mutations.entrySet().iterator();
    +        long[] serverTimeStamps = null;
    +        boolean sendAll = false;
    +        // Validate up front if not transactional so that we 
    +        if (tableRefIterator == null) {
    +            serverTimeStamps = validateAll();
    +            tableRefIterator = mutations.keySet().iterator();
    +            sendAll = true;
    +        }
    +
             // add tracing for this operation
             try (TraceScope trace = Tracing.startNewSpan(connection, 
"Committing mutations to tables")) {
                 Span span = trace.getSpan();
    -            while (iterator.hasNext()) {
    -                Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
    -                // at this point we are going through mutations for each 
table
    -
    -                Map<ImmutableBytesPtr,RowMutationState> valuesMap = 
entry.getValue();
    -                // above is mutations for a table where the first part is 
the row key and the second part is column values.
    -
    -                TableRef tableRef = entry.getKey();
    -                PTable table = tableRef.getTable();
    -                table.getIndexMaintainers(tempPtr, connection);
    -                boolean hasIndexMaintainers = tempPtr.getLength() > 0;
    -                boolean isDataTable = true;
    -                long serverTimestamp = serverTimeStamps[i++];
    -                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
    -                // above returns an iterator of pair where the first  
    -                while (mutationsIterator.hasNext()) {
    -                    Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
    -                    byte[] htableName = pair.getFirst();
    -                    List<Mutation> mutations = pair.getSecond();
    -
    -                    //create a span per target table
    -                    //TODO maybe we can be smarter about the table name to 
string here?
    -                    Span child = Tracing.child(span,"Writing mutation 
batch for table: "+Bytes.toString(htableName));
    -
    -                    int retryCount = 0;
    -                    boolean shouldRetry = false;
    -                    do {
    -                        ServerCache cache = null;
    -                        if (hasIndexMaintainers && isDataTable) {
    -                            byte[] attribValue = null;
    -                            byte[] uuidValue;
    -                            if 
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, 
tempPtr.getLength())) {
    -                                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
    -                                cache = 
client.addIndexMetadataCache(mutations, tempPtr);
    -                                child.addTimelineAnnotation("Updated index 
metadata cache");
    -                                uuidValue = cache.getId();
    -                                // If we haven't retried yet, retry for 
this case only, as it's possible that
    -                                // a split will occur after we send the 
index metadata cache to all known
    -                                // region servers.
    -                                shouldRetry = true;
    -                            } else {
    -                                attribValue = 
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
    -                                uuidValue = ServerCacheClient.generateId();
    -                            }
    -                            // Either set the UUID to be able to access 
the index metadata from the cache
    -                            // or set the index metadata directly on the 
Mutation
    -                            for (Mutation mutation : mutations) {
    -                                if (tenantId != null) {
    -                                    byte[] tenantIdBytes = 
ScanUtil.getTenantIdBytes(
    -                                        table.getRowKeySchema(),
    -                                        table.getBucketNum()!=null,
    -                                        tenantId);
    -                                    
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
    -                                }
    -                                
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
    -                                if (attribValue != null) {
    -                                    
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
    -                                }
    -                            }
    -                        }
    -
    -                        SQLException sqlE = null;
    -                        HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
    -                        try {
    -                            long numMutations = mutations.size();
    +           ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
    +           while (tableRefIterator.hasNext()) {
    +                   // at this point we are going through mutations for 
each table
    +               TableRef tableRef = tableRefIterator.next();
    +               Map<ImmutableBytesPtr, RowMutationState> valuesMap = 
mutations.get(tableRef);
    +               if (valuesMap == null || valuesMap.isEmpty()) {
    +                   continue;
    +               }
    +               PTable table = tableRef.getTable();
    +               // Track tables to which we've sent uncommitted data
    +               if (table.isTransactional()) {
    +                   
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
    +               }
    +               table.getIndexMaintainers(indexMetaDataPtr, connection);
    +               boolean isDataTable = true;
    +               // Validate as we go if transactional since we can undo if 
a problem occurs (which is unlikely)
    +               long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
    +               Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
    +               while (mutationsIterator.hasNext()) {
    +                   Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
    +                   byte[] htableName = pair.getFirst();
    +                   List<Mutation> mutationList = pair.getSecond();
    +                   
    +                   //create a span per target table
    +                   //TODO maybe we can be smarter about the table name to 
string here?
    +                   Span child = Tracing.child(span,"Writing mutation batch 
for table: "+Bytes.toString(htableName));
    +   
    +                   int retryCount = 0;
    +                   boolean shouldRetry = false;
    +                   do {
    +                       ServerCache cache = null;
    +                       if (isDataTable) {
    +                           cache = setMetaDataOnMutations(tableRef, 
mutationList, indexMetaDataPtr);
    +                       }
    +                   
    +                       // If we haven't retried yet, retry for this case 
only, as it's possible that
    +                       // a split will occur after we send the index 
metadata cache to all known
    +                       // region servers.
    +                       shouldRetry = cache != null;
    +                       SQLException sqlE = null;
    +                       HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
    +                       try {
    +                           if (table.isTransactional()) {
    +                               // If we have indexes, wrap the HTable in a 
delegate HTable that
    +                               // will attach the necessary index meta 
data in the event of a
    +                               // rollback
    +                               if (!table.getIndexes().isEmpty()) {
    +                                   hTable = new 
MetaDataAwareHTable(hTable, tableRef);
    +                               }
    +                               TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table);
    +                               // Don't add immutable indexes (those are 
the only ones that would participate
    +                               // during a commit), as we don't need 
conflict detection for these.
    +                               if (isDataTable) {
    +                                   // Even for immutable, we need to do 
this so that an abort has the state
    +                                   // necessary to generate the rows to 
delete.
    +                                   addTransactionParticipant(txnAware);
    +                               } else {
    +                                   txnAware.startTx(getTransaction());
    +                               }
    +                               hTable = txnAware;
    +                           }
    +                           long numMutations = mutationList.size();
                                 
GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                                 
                                 long startTime = System.currentTimeMillis();
    -                            child.addTimelineAnnotation("Attempt " + 
retryCount);
    -                            hTable.batch(mutations);
    -                            child.stop();
    +                            child.addTimelineAnnotation("Attempt " + 
retryCount);;
    +                           hTable.batch(mutationList);
    +                           child.stop();
    +                           child.stop();
                                 shouldRetry = false;
                                 long mutationCommitTime = 
System.currentTimeMillis() - startTime;
                                 
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                                 
    -                            long mutationSizeBytes = 
calculateMutationSize(mutations);
    +                            long mutationSizeBytes = 
calculateMutationSize(mutationList);
                                 MutationMetric mutationsMetric = new 
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
                                 
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), 
mutationsMetric);
    -                        } catch (Exception e) {
    -                            SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
    -                            if (inferredE != null) {
    -                                if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
    -                                    // Swallow this exception once, as 
it's possible that we split after sending the index metadata
    -                                    // and one of the region servers 
doesn't have it. This will cause it to have it the next go around.
    -                                    // If it fails again, we don't retry.
    -                                    String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
    -                                    
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
    -                                    
connection.getQueryServices().clearTableRegionCache(htableName);
    +                       } catch (Exception e) {
    +                           SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
    +                           if (inferredE != null) {
    +                               if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
    +                                   // Swallow this exception once, as it's 
possible that we split after sending the index metadata
    +                                   // and one of the region servers 
doesn't have it. This will cause it to have it the next go around.
    +                                   // If it fails again, we don't retry.
    +                                   String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
    +                                   
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
    +                                   
connection.getQueryServices().clearTableRegionCache(htableName);
    +   
    +                                   // add a new child span as this one 
failed
    +                                   child.addTimelineAnnotation(msg);
    +                                   child.stop();
    +                                   child = Tracing.child(span,"Failed 
batch, attempting retry");
    +   
    +                                   continue;
    +                               }
    +                               e = inferredE;
    +                           }
    +                           // Throw to client with both what was committed 
so far and what is left to be committed.
    +                           // That way, client can either undo what was 
done or try again with what was not done.
    +                           sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
    +                       } finally {
    +                           try {
    +                               if (cache != null) {
    +                                   cache.close();
    +                               }
    +                           } finally {
    +                               try {
    +                                   hTable.close();
    +                               } 
    +                               catch (IOException e) {
    +                                   if (sqlE != null) {
    +                                       
sqlE.setNextException(ServerUtil.parseServerException(e));
    +                                   } else {
    +                                       sqlE = 
ServerUtil.parseServerException(e);
    +                                   }
    +                               } 
    +                               if (sqlE != null) {
    +                                   // clear pending mutations
    +                                   mutations.clear();
    +                                   throw sqlE;
    +                               }
    +                           }
    +                       }
    +                   } while (shouldRetry && retryCount++ < 1);
    +                   isDataTable = false;
    +               }
    +               if (tableRef.getTable().getType() != PTableType.INDEX) {
    +                   numRows -= valuesMap.size();
    +               }
    +               // Remove batches as we process them
    +               if (sendAll) {
    +                   tableRefIterator.remove(); // Iterating through actual 
map in this case
    +               } else {
    +                   mutations.remove(tableRef);
    +               }
    +           }
    +        }
    +        // Note that we cannot assume that *all* mutations have been sent, 
since we've optimized this
    +        // now to only send the mutations for the tables we're querying, 
hence we've removed the
    +        // assertions that we're here before.
    +    }
     
    -                                    // add a new child span as this one 
failed
    -                                    child.addTimelineAnnotation(msg);
    -                                    child.stop();
    -                                    child = Tracing.child(span,"Failed 
batch, attempting retry");
    +    public byte[] encodeTransaction() throws SQLException {
    +        try {
    +            return CODEC.encode(getTransaction());
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +    
    +    public static Transaction decodeTransaction(byte[] txnBytes) throws 
IOException {
    +           return (txnBytes == null || txnBytes.length==0) ? null : 
CODEC.decode(txnBytes);
    +    }
     
    -                                    continue;
    -                                }
    -                                e = inferredE;
    -                            }
    -                            sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
    -                        } finally {
    -                            try {
    -                                hTable.close();
    -                            } catch (IOException e) {
    -                                if (sqlE != null) {
    -                                    
sqlE.setNextException(ServerUtil.parseServerException(e));
    -                                } else {
    -                                    sqlE = 
ServerUtil.parseServerException(e);
    -                                }
    -                            } finally {
    -                                try {
    -                                    if (cache != null) {
    -                                        cache.close();
    -                                    }
    -                                } finally {
    -                                    if (sqlE != null) {
    -                                        throw sqlE;
    -                                    }
    -                                }
    -                            }
    -                        }
    -                    } while (shouldRetry && retryCount++ < 1);
    -                    isDataTable = false;
    -                }
    -                if (tableRef.getTable().getType() != PTableType.INDEX) {
    -                    numRows -= entry.getValue().size();
    +    private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
    +            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
    +        PTable table = tableRef.getTable();
    +        byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
    +        ServerCache cache = null;
    +        byte[] attribValue = null;
    +        byte[] uuidValue = null;
    +        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
    +        if (table.isTransactional()) {
    +            txState = encodeTransaction();
    +        }
    +        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
    +        if (hasIndexMetaData) {
    +            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, 
mutations, indexMetaDataPtr.getLength() + txState.length)) {
    +                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
    +                cache = client.addIndexMetadataCache(mutations, 
indexMetaDataPtr, txState);
    +                uuidValue = cache.getId();
    +            } else {
    +                attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
    +                uuidValue = ServerCacheClient.generateId();
    +            }
    +        } else if (txState.length == 0) {
    +            return null;
    +        }
    +        // Either set the UUID to be able to access the index metadata 
from the cache
    +        // or set the index metadata directly on the Mutation
    +        for (Mutation mutation : mutations) {
    +            if (tenantId != null) {
    +                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId);
    +            }
    +            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
    +            if (attribValue != null) {
    +                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, 
attribValue);
    +                if (txState.length > 0) {
    +                    
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                     }
    -                iterator.remove(); // Remove batches as we process them
    +            } else if (!hasIndexMetaData && txState.length > 0) {
    +                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
                 }
             }
    -        assert(numRows==0);
    -        assert(this.mutations.isEmpty());
    +        return cache;
         }
         
    -    public void rollback(PhoenixConnection connection) throws SQLException 
{
    +    public void clear() throws SQLException {
    --- End diff --
    
    Make this clear() method private if possible.


> Snapshot isolation transaction support through Tephra
> -----------------------------------------------------
>
>                 Key: PHOENIX-1674
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-1674
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: James Taylor
>              Labels: SFDC
>
> Tephra (http://tephra.io/ and https://github.com/caskdata/tephra) is one 
> option for getting transaction support in Phoenix. Let's use this JIRA to 
> discuss the way in which this could be integrated along with the pros and 
> cons.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to