Github user JamesRTaylor commented on a diff in the pull request: https://github.com/apache/phoenix/pull/129#discussion_r45307398 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java --- @@ -410,149 +642,293 @@ private static long calculateMutationSize(List<Mutation> mutations) { return byteSize; } + private boolean hasKeyValueColumn(PTable table, PTable index) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + return !maintainer.getAllColumns().isEmpty(); + } + + private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) { + while (enabledImmutableIndexes.hasNext()) { + PTable index = enabledImmutableIndexes.next(); + if (index.getIndexType() != IndexType.LOCAL) { + if (hasKeyValueColumn(table, index)) { + keyValueIndexes.add(index); + } else { + rowKeyIndexes.add(index); + } + } + } + } + private class MetaDataAwareHTable extends DelegateHTableInterface { + private final TableRef tableRef; + + private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { + super(delegate); + this.tableRef = tableRef; + } + + /** + * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an + * opportunity to attach our index meta data to the mutations such that we can also undo + * the index mutations. + */ + @Override + public void delete(List<Delete> deletes) throws IOException { + try { + PTable table = tableRef.getTable(); + List<PTable> indexes = table.getIndexes(); + Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator()); + if (enabledIndexes.hasNext()) { + List<PTable> keyValueIndexes = Collections.emptyList(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection); + if (table.isImmutableRows()) { + List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); + // Generate index deletes for immutable indexes that only reference row key + // columns and submit directly here. + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + for (PTable index : rowKeyIndexes) { + List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection); + HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); + hindex.delete(indexDeletes); + } + } + + // If we have mutable indexes, local immutable indexes, or global immutable indexes + // that reference key value columns, setup index meta data and attach here. In this + // case updates to the indexes will be generated on the server side. + // An alternative would be to let Tephra track the row keys for the immutable index + // by adding it as a transaction participant (soon we can prevent any conflict + // detection from occurring) with the downside being the additional memory required. + if (!keyValueIndexes.isEmpty()) { + attachMetaData = true; + IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); + } + if (attachMetaData) { + setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); + } + } + delegate.delete(deletes); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + @SuppressWarnings("deprecation") - public void commit() throws SQLException { + private void send(Iterator<TableRef> tableRefIterator) throws SQLException { int i = 0; - PName tenantId = connection.getTenantId(); - long[] serverTimeStamps = validate(); - Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); + long[] serverTimeStamps = null; + boolean sendAll = false; + // Validate up front if not transactional so that we + if (tableRefIterator == null) { + serverTimeStamps = validateAll(); + tableRefIterator = mutations.keySet().iterator(); + sendAll = true; + } + // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); - while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); - // at this point we are going through mutations for each table - - Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); - // above is mutations for a table where the first part is the row key and the second part is column values. - - TableRef tableRef = entry.getKey(); - PTable table = tableRef.getTable(); - table.getIndexMaintainers(tempPtr, connection); - boolean hasIndexMaintainers = tempPtr.getLength() > 0; - boolean isDataTable = true; - long serverTimestamp = serverTimeStamps[i++]; - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); - // above returns an iterator of pair where the first - while (mutationsIterator.hasNext()) { - Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); - List<Mutation> mutations = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - do { - ServerCache cache = null; - if (hasIndexMaintainers && isDataTable) { - byte[] attribValue = null; - byte[] uuidValue; - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, tempPtr); - child.addTimelineAnnotation("Updated index metadata cache"); - uuidValue = cache.getId(); - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = true; - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr); - uuidValue = ServerCacheClient.generateId(); - } - // Either set the UUID to be able to access the index metadata from the cache - // or set the index metadata directly on the Mutation - for (Mutation mutation : mutations) { - if (tenantId != null) { - byte[] tenantIdBytes = ScanUtil.getTenantIdBytes( - table.getRowKeySchema(), - table.getBucketNum()!=null, - tenantId); - mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes); - } - mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - } - } - } - - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - long numMutations = mutations.size(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + while (tableRefIterator.hasNext()) { + // at this point we are going through mutations for each table + TableRef tableRef = tableRefIterator.next(); + Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef); + if (valuesMap == null || valuesMap.isEmpty()) { + continue; + } + PTable table = tableRef.getTable(); + // Track tables to which we've sent uncommitted data + if (table.isTransactional()) { + uncommittedPhysicalNames.add(table.getPhysicalName().getString()); + } + table.getIndexMaintainers(indexMetaDataPtr, connection); + boolean isDataTable = true; + // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) + long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; + Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); + while (mutationsIterator.hasNext()) { + Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); + byte[] htableName = pair.getFirst(); + List<Mutation> mutationList = pair.getSecond(); + + //create a span per target table + //TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + + int retryCount = 0; + boolean shouldRetry = false; + do { + ServerCache cache = null; + if (isDataTable) { + cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr); + } + + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = cache != null; + SQLException sqlE = null; + HTableInterface hTable = connection.getQueryServices().getTable(htableName); + try { + if (table.isTransactional()) { + // If we have indexes, wrap the HTable in a delegate HTable that + // will attach the necessary index meta data in the event of a + // rollback + if (!table.getIndexes().isEmpty()) { + hTable = new MetaDataAwareHTable(hTable, tableRef); + } + TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); + // Don't add immutable indexes (those are the only ones that would participate + // during a commit), as we don't need conflict detection for these. + if (isDataTable) { + // Even for immutable, we need to do this so that an abort has the state + // necessary to generate the rows to delete. + addTransactionParticipant(txnAware); + } else { + txnAware.startTx(getTransaction()); + } + hTable = txnAware; + } + long numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutations); - child.stop(); + child.addTimelineAnnotation("Attempt " + retryCount);; + hTable.batch(mutationList); + child.stop(); + child.stop(); shouldRetry = false; long mutationCommitTime = System.currentTimeMillis() - startTime; GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); - long mutationSizeBytes = calculateMutationSize(mutations); + long mutationSizeBytes = calculateMutationSize(mutationList); MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); + } catch (Exception e) { + SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); + if (inferredE != null) { + if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index metadata + // and one of the region servers doesn't have it. This will cause it to have it the next go around. + // If it fails again, we don't retry. + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + logger.warn(LogUtil.addCustomAnnotations(msg, connection)); + connection.getQueryServices().clearTableRegionCache(htableName); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span,"Failed batch, attempting retry"); + + continue; + } + e = inferredE; + } + // Throw to client with both what was committed so far and what is left to be committed. + // That way, client can either undo what was done or try again with what was not done. + sqlE = new CommitException(e, getUncommittedStatementIndexes()); + } finally { + try { + if (cache != null) { + cache.close(); + } + } finally { + try { + hTable.close(); + } + catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); + } + } + if (sqlE != null) { + // clear pending mutations + mutations.clear(); + throw sqlE; + } + } + } + } while (shouldRetry && retryCount++ < 1); + isDataTable = false; + } + if (tableRef.getTable().getType() != PTableType.INDEX) { + numRows -= valuesMap.size(); + } + // Remove batches as we process them + if (sendAll) { + tableRefIterator.remove(); // Iterating through actual map in this case + } else { + mutations.remove(tableRef); + } + } + } + // Note that we cannot assume that *all* mutations have been sent, since we've optimized this + // now to only send the mutations for the tables we're querying, hence we've removed the + // assertions that we're here before. + } - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); + public byte[] encodeTransaction() throws SQLException { + try { + return CODEC.encode(getTransaction()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { + return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); + } - continue; - } - e = inferredE; - } - sqlE = new CommitException(e, getUncommittedStatementIndexes()); - } finally { - try { - hTable.close(); - } catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - if (sqlE != null) { - throw sqlE; - } - } - } - } - } while (shouldRetry && retryCount++ < 1); - isDataTable = false; - } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= entry.getValue().size(); + private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, + ImmutableBytesWritable indexMetaDataPtr) throws SQLException { + PTable table = tableRef.getTable(); + byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); + ServerCache cache = null; + byte[] attribValue = null; + byte[] uuidValue = null; + byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; + if (table.isTransactional()) { + txState = encodeTransaction(); + } + boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; + if (hasIndexMetaData) { + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); + uuidValue = cache.getId(); + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + uuidValue = ServerCacheClient.generateId(); + } + } else if (txState.length == 0) { + return null; + } + // Either set the UUID to be able to access the index metadata from the cache + // or set the index metadata directly on the Mutation + for (Mutation mutation : mutations) { + if (tenantId != null) { + mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + if (attribValue != null) { + mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + if (txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } - iterator.remove(); // Remove batches as we process them + } else if (!hasIndexMetaData && txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } } - assert(numRows==0); - assert(this.mutations.isEmpty()); + return cache; } - public void rollback(PhoenixConnection connection) throws SQLException { + public void clear() throws SQLException { --- End diff -- Make this clear() method private if possible.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---