http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 79da59f,d4ef1cf..e76f55f --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@@ -27,59 -29,51 +29,71 @@@ import java.util.Collections import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionCodec; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionNotInProgressException; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.hbase98.TransactionAwareHTable; + import javax.annotation.Nonnull; + import javax.annotation.concurrent.Immutable; + import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; + import org.apache.htrace.Span; + import org.apache.htrace.TraceScope; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; - import org.apache.phoenix.monitoring.PhoenixMetrics; + import org.apache.phoenix.monitoring.GlobalClientMetrics; + import org.apache.phoenix.monitoring.MutationMetricQueue; + import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric; + import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue; + import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; + import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; + import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableRef; + import org.apache.phoenix.schema.ValueSchema.Field; + import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; + import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; - import org.cloudera.htrace.Span; - import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -98,207 -90,47 +112,220 @@@ import com.google.common.collect.Sets */ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); - + private static final TransactionCodec CODEC = new TransactionCodec(); + private PhoenixConnection connection; private final long maxSize; - // map from table to rows - // rows - map from rowkey to columns - // columns - map from column to value - private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? - private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); + private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; + private final List<TransactionAware> txAwares; + private final TransactionContext txContext; + private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); + + private Transaction tx; private long sizeOffset; private int numRows = 0; + private boolean txStarted = false; + ++ private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); + private final MutationMetricQueue mutationMetricQueue; + private ReadMetricQueue readMetricQueue; + - MutationState(long maxSize, PhoenixConnection connection, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { - this.maxSize = maxSize; - this.connection = connection; - this.mutations = mutations; - boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); - this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() - : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; - } - public MutationState(long maxSize, PhoenixConnection connection) { - this(maxSize,connection,null); - this(maxSize,connection,0); ++ this(maxSize,connection, null); + } + + public MutationState(MutationState mutationState) { - this(mutationState.maxSize,mutationState.connection,mutationState.getTransaction()); ++ this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction()); } public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize())); + this(maxSize, connection, null, sizeOffset); + } + + private MutationState(long maxSize, PhoenixConnection connection, Transaction tx) { + this(maxSize,connection, tx, 0); + } + + private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) { - this.maxSize = maxSize; - this.connection = connection; ++ this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx); this.sizeOffset = sizeOffset; - this.tx = tx; - if (tx == null) { - this.txAwares = Collections.emptyList(); - TransactionSystemClient txServiceClient = this.connection.getQueryServices().getTransactionSystemClient(); - this.txContext = new TransactionContext(txServiceClient); - } else { - txAwares = Lists.newArrayList(); - txContext = null; - } } - public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this.maxSize = maxSize; - this.connection = connection; ++ MutationState(long maxSize, PhoenixConnection connection, ++ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, ++ Transaction tx) { ++ this.maxSize = maxSize; ++ this.connection = connection; ++ this.mutations = mutations; ++ boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); ++ this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() ++ : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; ++ if (tx == null) { ++ this.txAwares = Collections.emptyList(); ++ TransactionSystemClient txServiceClient = this.connection ++ .getQueryServices().getTransactionSystemClient(); ++ this.txContext = new TransactionContext(txServiceClient); ++ } else { ++ txAwares = Lists.newArrayList(); ++ txContext = null; ++ } ++ } ++ + public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, sizeOffset); ++ this(maxSize, connection, null, sizeOffset); this.mutations.put(table, mutations); - this.sizeOffset = sizeOffset; this.numRows = mutations.size(); - this.txAwares = Lists.newArrayList(); - this.txContext = null; + this.tx = connection.getMutationState().getTransaction(); throwIfTooBig(); } + public boolean checkpoint(MutationPlan plan) throws SQLException { + Transaction currentTx = getTransaction(); + if (currentTx == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { + return false; + } + Set<TableRef> sources = plan.getSourceRefs(); + if (sources.isEmpty()) { + return false; + } + // For a DELETE statement, we're always querying the table being deleted from. This isn't + // a problem, but it potentially could be if there are other references to the same table + // nested in the DELETE statement (as a sub query or join, for example). + TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null; + boolean excludeCurrent = false; + String targetPhysicalName = plan.getTargetRef().getTable().getPhysicalName().getString(); + for (TableRef source : sources) { + if (source.getTable().isTransactional() && !source.equals(ignoreForExcludeCurrent)) { + String sourcePhysicalName = source.getTable().getPhysicalName().getString(); + if (targetPhysicalName.equals(sourcePhysicalName)) { + excludeCurrent = true; + break; + } + } + } + // If we're querying the same table we're updating, we must exclude our writes to + // it from being visible. + if (excludeCurrent) { + // If any source tables have uncommitted data prior to last checkpoint, + // then we must create a new checkpoint. + boolean hasUncommittedData = false; + for (TableRef source : sources) { + String sourcePhysicalName = source.getTable().getPhysicalName().getString(); + if (source.getTable().isTransactional() && uncommittedPhysicalNames.contains(sourcePhysicalName)) { + hasUncommittedData = true; + break; + } + } + if (hasUncommittedData) { + try { + if (txContext == null) { + tx = currentTx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); + } else { + txContext.checkpoint(); + tx = currentTx = txContext.getCurrentTransaction(); + } + // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards + // should see all this data. + uncommittedPhysicalNames.clear(); + } catch (TransactionFailureException | TransactionNotInProgressException e) { + throw new SQLException(e); + } + } + // Since we're querying our own table while mutating it, we must exclude + // see our current mutations, otherwise we can get erroneous results (for DELETE) + // or get into an infinite loop (for UPSERT SELECT). + currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + return true; + } + return false; + } + + private void addTransactionParticipant(TransactionAware txAware) throws SQLException { + if (txContext == null) { + txAwares.add(txAware); + assert(tx != null); + txAware.startTx(tx); + } else { + txContext.addTransactionAware(txAware); + } + } + + // Though MutationState is not thread safe in general, this method should be because it may + // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose + // the Transaction outside of MutationState, this seems reasonable, as the member variables + // would not change as these threads are running. + public HTableInterface getHTable(PTable table) throws SQLException { + HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); + Transaction currentTx; + if (table.isTransactional() && (currentTx=getTransaction()) != null) { + TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table); + // Using cloned mutationState as we may have started a new transaction already + // if auto commit is true and we need to use the original one here. + txAware.startTx(currentTx); + htable = txAware; + } + return htable; + } + + public PhoenixConnection getConnection() { + return connection; + } + + // Kept private as the Transaction may change when check pointed. Keeping it private ensures + // no one holds on to a stale copy. + private Transaction getTransaction() { + return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; + } + + public boolean isTransactionStarted() { + return getTransaction() != null; + } + + public long getReadPointer() { + Transaction tx = getTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getReadPointer(); + } + + // For testing + public long getWritePointer() { + Transaction tx = getTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); + } + + // For testing + public VisibilityLevel getVisibilityLevel() { + Transaction tx = getTransaction(); + return tx == null ? null : tx.getVisibilityLevel(); + } + + public boolean startTransaction() throws SQLException { + if (txContext == null) { + throw new SQLException("No transaction context"); // TODO: error code + } + + if (connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) + .build().buildException(); + } + + try { + if (!txStarted) { + txContext.start(); + txStarted = true; + return true; + } + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } + return false; + } ++ + public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap()); ++ MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null); + state.sizeOffset = 0; + return state; + } private void throwIfTooBig() { if (numRows > maxSize) { @@@ -312,26 -144,18 +339,27 @@@ } /** - * Combine a newer mutation with this one, where in the event of overlaps, - * the newer one will take precedence. - * @param newMutation the newer mutation + * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence. + * Combine any metrics collected for the newer mutation. + * + * @param newMutationState the newer mutation state */ - public void join(MutationState newMutation) { - if (this == newMutation) { // Doesn't make sense + public void join(MutationState newMutationState) { + if (this == newMutationState) { // Doesn't make sense return; } + // TODO: what if new and old have txContext as that's really an error + // Really it's an error if newMutation txContext is not null + if (txContext != null) { + for (TransactionAware txAware : txAwares) { + txContext.addTransactionAware(txAware); + } + } else { - txAwares.addAll(newMutation.txAwares); ++ txAwares.addAll(newMutationState.txAwares); + } - this.sizeOffset += newMutation.sizeOffset; + this.sizeOffset += newMutationState.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); PTable table = tableRef.getTable(); @@@ -373,14 -202,72 +406,37 @@@ throwIfTooBig(); } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { - private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { ++ ++ private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { + RowKeySchema schema = table.getRowKeySchema(); + int rowTimestampColPos = table.getRowTimestampColPos(); + Field rowTimestampField = schema.getField(rowTimestampColPos); + byte[] rowTimestampBytes = PLong.INSTANCE.toBytes(rowTimestamp, rowTimestampField.getSortOrder()); + int oldOffset = ptr.getOffset(); + int oldLength = ptr.getLength(); + // Move the pointer to the start byte of the row timestamp pk + schema.position(ptr, 0, rowTimestampColPos); + byte[] b = ptr.get(); + int newOffset = ptr.getOffset(); + int length = ptr.getLength(); + for (int i = newOffset; i < newOffset + length; i++) { + // modify the underlying bytes array with the bytes of the row timestamp + b[i] = rowTimestampBytes[i - newOffset]; + } + // move the pointer back to where it was before. + ptr.set(ptr.get(), oldOffset, oldLength); + return ptr; + } + - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) { ++ private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { + final PTable table = tableRef.getTable(); - boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism - (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? - IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : + (table.isImmutableRows() || includeMutableIndexes) ? + IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : Iterators.<PTable>emptyIterator(); - final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); + final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; - Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator(); - long timestampToUse = timestamp; - while (iterator.hasNext()) { - Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next(); - ImmutableBytesPtr key = rowEntry.getKey(); - RowMutationState state = rowEntry.getValue(); - if (tableWithRowTimestampCol) { - RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); - if (rowTsColInfo.useServerTimestamp()) { - // regenerate the key with this timestamp. - key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); - } else { - if (rowTsColInfo.getTimestamp() != null) { - timestampToUse = rowTsColInfo.getTimestamp(); - } - } - } - PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key); - List<Mutation> rowMutations, rowMutationsPertainingToIndex; - if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete - row.delete(); - rowMutations = row.toRowMutations(); - // Row deletes for index tables are processed by running a re-written query - // against the index table (as this allows for flexibility in being able to - // delete rows). - rowMutationsPertainingToIndex = Collections.emptyList(); - } else { - for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { - row.setValue(valueEntry.getKey(), valueEntry.getValue()); - } - rowMutations = row.toRowMutations(); - rowMutationsPertainingToIndex = rowMutations; - } - mutations.addAll(rowMutations); - if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); - } + generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex); return new Iterator<Pair<byte[],List<Mutation>>>() { boolean isFirst = true; @@@ -393,24 -280,14 +449,24 @@@ public Pair<byte[], List<Mutation>> next() { if (isFirst) { isFirst = false; - return new Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutationList); - return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(),mutations); ++ return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(), mutationList); } PTable index = indexes.next(); List<Mutation> indexMutations; try { indexMutations = - IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex, - connection.getKeyValueBuilder(), connection); + IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex, - tempPtr, connection.getKeyValueBuilder(), connection); ++ connection.getKeyValueBuilder(), connection); + // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map + if (!sendAll) { + TableRef key = new TableRef(index); - Map<ImmutableBytesPtr, Map<PColumn, byte[]>> rowToColumnMap = mutations.remove(key); ++ Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); + if (rowToColumnMap!=null) { + final List<Mutation> deleteMutations = Lists.newArrayList(); + generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null); + indexMutations.addAll(deleteMutations); + } + } } catch (SQLException e) { throw new IllegalDataException(e); } @@@ -424,54 -301,24 +480,69 @@@ }; } + + private void generateMutations(final TableRef tableRef, long timestamp, - final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, ++ final Map<ImmutableBytesPtr, RowMutationState> values, + final List<Mutation> mutationList, + final List<Mutation> mutationsPertainingToIndex) { - Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); ++ final PTable table = tableRef.getTable(); ++ boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; ++ Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator(); ++ long timestampToUse = timestamp; + while (iterator.hasNext()) { - Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); ++ Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); + ImmutableBytesPtr key = rowEntry.getKey(); - PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key); ++ RowMutationState state = rowEntry.getValue(); ++ if (tableWithRowTimestampCol) { ++ RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); ++ if (rowTsColInfo.useServerTimestamp()) { ++ // regenerate the key with this timestamp. ++ key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); ++ } else { ++ if (rowTsColInfo.getTimestamp() != null) { ++ timestampToUse = rowTsColInfo.getTimestamp(); ++ } ++ } ++ } ++ PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestampToUse, key); + List<Mutation> rowMutations, rowMutationsPertainingToIndex; - if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete ++ if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete + row.delete(); + rowMutations = row.toRowMutations(); + // Row deletes for index tables are processed by running a re-written query + // against the index table (as this allows for flexibility in being able to + // delete rows). + rowMutationsPertainingToIndex = Collections.emptyList(); + } else { - for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) { ++ for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { + row.setValue(valueEntry.getKey(), valueEntry.getValue()); + } + rowMutations = row.toRowMutations(); + rowMutationsPertainingToIndex = rowMutations; + } + mutationList.addAll(rowMutations); + if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); + } + } /** * Get the unsorted list of HBase mutations for the tables with uncommitted data. * @return list of HBase mutations for uncommitted data. */ - public Iterator<Pair<byte[],List<Mutation>>> toMutations() { - return toMutations(false); + public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) { + return toMutations(false, timestamp); } - public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) { + public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { - final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); + final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Iterators.emptyIterator(); } Long scn = connection.getSCN(); - final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); +// final long timestamp = (scn == null ? HConstants.LATEST_TIMESTAMP : scn); return new Iterator<Pair<byte[],List<Mutation>>>() { - private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next(); + private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { @@@ -499,353 -346,213 +570,359 @@@ }; } -- /** -- * Validates that the meta data is valid against the server meta data if we haven't yet done so. -- * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data -- * has changed. -- * @param connection -- * @return the server time to use for the upsert -- * @throws SQLException if the table or any columns no longer exist -- */ - private long[] validateAll() throws SQLException { - private long[] validate() throws SQLException { -- int i = 0; - long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) { - TableRef tableRef = entry.getKey(); - timeStamps[i++] = validate(tableRef, entry.getValue()); - } - return timeStamps; - } - - private long validate(TableRef tableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> values) throws SQLException { -- Long scn = connection.getSCN(); -- MetaDataClient client = new MetaDataClient(connection); - long serverTimeStamp = tableRef.getTimeStamp(); - PTable table = tableRef.getTable(); - // If we're auto committing, we've already validated the schema when we got the ColumnResolver, - // so no need to do it again here. - if (!connection.getAutoCommit()) { - long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { - TableRef tableRef = entry.getKey(); - long serverTimeStamp = tableRef.getTimeStamp(); - PTable table = tableRef.getTable(); - // If we're auto committing, we've already validated the schema when we got the ColumnResolver, - // so no need to do it again here. - if (!connection.getAutoCommit()) { - MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); - long timestamp = result.getMutationTime(); - if (timestamp != QueryConstants.UNSET_TIMESTAMP) { - serverTimeStamp = timestamp; - if (result.wasUpdated()) { - // TODO: use bitset? - table = result.getTable(); - PColumn[] columns = new PColumn[table.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { - RowMutationState valueEntry = rowEntry.getValue(); - if (valueEntry != null) { - Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); - if (colValues != PRow.DELETE_MARKER) { - for (PColumn column : colValues.keySet()) { - columns[column.getPosition()] = column; - } - } - } ++ /** ++ * Validates that the meta data is valid against the server meta data if we haven't yet done so. ++ * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data ++ * has changed. ++ * @param connection ++ * @return the server time to use for the upsert ++ * @throws SQLException if the table or any columns no longer exist ++ */ ++ private long[] validateAll() throws SQLException { ++ int i = 0; ++ long[] timeStamps = new long[this.mutations.size()]; ++ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { ++ TableRef tableRef = entry.getKey(); ++ timeStamps[i++] = validate(tableRef, entry.getValue()); ++ } ++ return timeStamps; ++ } ++ ++ private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { ++ Long scn = connection.getSCN(); ++ MetaDataClient client = new MetaDataClient(connection); ++ long serverTimeStamp = tableRef.getTimeStamp(); ++ PTable table = tableRef.getTable(); ++ // If we're auto committing, we've already validated the schema when we got the ColumnResolver, ++ // so no need to do it again here. ++ if (!connection.getAutoCommit()) { + MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); + long timestamp = result.getMutationTime(); + if (timestamp != QueryConstants.UNSET_TIMESTAMP) { + serverTimeStamp = timestamp; + if (result.wasUpdated()) { + // TODO: use bitset? + table = result.getTable(); + PColumn[] columns = new PColumn[table.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : values.entrySet()) { - Map<PColumn,byte[]> valueEntry = rowEntry.getValue(); - if (valueEntry != PRow.DELETE_MARKER) { - for (PColumn column : valueEntry.keySet()) { - columns[column.getPosition()] = column; - } ++ for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { ++ RowMutationState valueEntry = rowEntry.getValue(); ++ if (valueEntry != null) { ++ Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); ++ if (colValues != PRow.DELETE_MARKER) { ++ for (PColumn column : colValues.keySet()) { ++ columns[column.getPosition()] = column; ++ } ++ } } - for (PColumn column : columns) { - if (column != null) { - table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); - } + } + for (PColumn column : columns) { + if (column != null) { + table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); } - tableRef.setTable(table); } + tableRef.setTable(table); } } - timeStamps[i++] = scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; } - return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; - return timeStamps; -- } ++ return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; ++ } - private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) { + private static long calculateMutationSize(List<Mutation> mutations) { long byteSize = 0; - int keyValueCount = 0; - if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) { + if (GlobalClientMetrics.isMetricsEnabled()) { for (Mutation mutation : mutations) { byteSize += mutation.heapSize(); } - MUTATION_BYTES.update(byteSize); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection)); - } } + GLOBAL_MUTATION_BYTES.update(byteSize); + return byteSize; } + private boolean hasKeyValueColumn(PTable table, PTable index) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + return !maintainer.getAllColumns().isEmpty(); + } + + private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) { + while (enabledImmutableIndexes.hasNext()) { + PTable index = enabledImmutableIndexes.next(); + if (index.getIndexType() != IndexType.LOCAL) { + if (hasKeyValueColumn(table, index)) { + keyValueIndexes.add(index); + } else { + rowKeyIndexes.add(index); + } + } + } + } + private class MetaDataAwareHTable extends DelegateHTableInterface { + private final TableRef tableRef; + + private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { + super(delegate); + this.tableRef = tableRef; + } + + /** + * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an + * opportunity to attach our index meta data to the mutations such that we can also undo + * the index mutations. + */ + @Override + public void delete(List<Delete> deletes) throws IOException { + try { + PTable table = tableRef.getTable(); + List<PTable> indexes = table.getIndexes(); + Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator()); + if (enabledIndexes.hasNext()) { + List<PTable> keyValueIndexes = Collections.emptyList(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection); + if (table.isImmutableRows()) { + List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); + // Generate index deletes for immutable indexes that only reference row key + // columns and submit directly here. + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + for (PTable index : rowKeyIndexes) { + List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection); + HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); + hindex.delete(indexDeletes); + } + } + + // If we have mutable indexes, local immutable indexes, or global immutable indexes + // that reference key value columns, setup index meta data and attach here. In this + // case updates to the indexes will be generated on the server side. + // An alternative would be to let Tephra track the row keys for the immutable index + // by adding it as a transaction participant (soon we can prevent any conflict + // detection from occurring) with the downside being the additional memory required. + if (!keyValueIndexes.isEmpty()) { + attachMetaData = true; + IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); + } + if (attachMetaData) { + setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); + } + } + delegate.delete(deletes); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + @SuppressWarnings("deprecation") - public void commit() throws SQLException { + private void send(Iterator<TableRef> tableRefIterator) throws SQLException { int i = 0; - PName tenantId = connection.getTenantId(); - long[] serverTimeStamps = validate(); - Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); + long[] serverTimeStamps = null; + boolean sendAll = false; + // Validate up front if not transactional so that we + if (tableRefIterator == null) { + serverTimeStamps = validateAll(); + tableRefIterator = mutations.keySet().iterator(); + sendAll = true; + } + // add tracing for this operation - TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); - Span span = trace.getSpan(); - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); - while (tableRefIterator.hasNext()) { - TableRef tableRef = tableRefIterator.next(); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef); - if (valuesMap == null) { - continue; - } - PTable table = tableRef.getTable(); - // Track tables to which we've sent uncommitted data - if (table.isTransactional()) { - uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - } - table.getIndexMaintainers(indexMetaDataPtr, connection); - boolean isDataTable = true; - // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); - while (mutationsIterator.hasNext()) { - Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); - List<Mutation> mutations = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - do { - ServerCache cache = null; - if (isDataTable) { - cache = setMetaDataOnMutations(tableRef, mutations, indexMetaDataPtr); - } - - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = cache != null; - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - if (table.isTransactional()) { - // If we have indexes, wrap the HTable in a delegate HTable that - // will attach the necessary index meta data in the event of a - // rollback - if (!table.getIndexes().isEmpty()) { - hTable = new MetaDataAwareHTable(hTable, tableRef); - } - TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); - // Don't add immutable indexes (those are the only ones that would participate - // during a commit), as we don't need conflict detection for these. - if (isDataTable) { - // Even for immutable, we need to do this so that an abort has the state - // necessary to generate the rows to delete. - addTransactionParticipant(txnAware); - } else { - txnAware.startTx(getTransaction()); - } - hTable = txnAware; - } - logMutationSize(hTable, mutations, connection); - MUTATION_BATCH_SIZE.update(mutations.size()); - long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutations); - child.stop(); - long duration = System.currentTimeMillis() - startTime; - MUTATION_COMMIT_TIME.update(duration); - shouldRetry = false; - if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection)); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); - - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); - - continue; - } - e = inferredE; - } - // Throw to client with both what was committed so far and what is left to be committed. - // That way, client can either undo what was done or try again with what was not done. - sqlE = new CommitException(e, this); - } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - try { - hTable.close(); - } - catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } - if (sqlE != null) { - throw sqlE; - } - } - } - } while (shouldRetry && retryCount++ < 1); - isDataTable = false; - } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= valuesMap.size(); - } - // Remove batches as we process them - if (sendAll) { - tableRefIterator.remove(); // Iterating through actual map in this case - } else { - mutations.remove(tableRef); - } + try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { + Span span = trace.getSpan(); - while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); - // at this point we are going through mutations for each table - - Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); - // above is mutations for a table where the first part is the row key and the second part is column values. - - TableRef tableRef = entry.getKey(); - PTable table = tableRef.getTable(); - table.getIndexMaintainers(tempPtr, connection); - boolean hasIndexMaintainers = tempPtr.getLength() > 0; - boolean isDataTable = true; - long serverTimestamp = serverTimeStamps[i++]; - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); - // above returns an iterator of pair where the first - while (mutationsIterator.hasNext()) { - Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); - List<Mutation> mutations = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - do { - ServerCache cache = null; - if (hasIndexMaintainers && isDataTable) { - byte[] attribValue = null; - byte[] uuidValue; - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, tempPtr); - child.addTimelineAnnotation("Updated index metadata cache"); - uuidValue = cache.getId(); - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = true; - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr); - uuidValue = ServerCacheClient.generateId(); - } - // Either set the UUID to be able to access the index metadata from the cache - // or set the index metadata directly on the Mutation - for (Mutation mutation : mutations) { - if (tenantId != null) { - byte[] tenantIdBytes = ScanUtil.getTenantIdBytes( - table.getRowKeySchema(), - table.getBucketNum()!=null, - tenantId); - mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes); - } - mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - } - } - } - - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - long numMutations = mutations.size(); ++ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); ++ while (tableRefIterator.hasNext()) { ++ // at this point we are going through mutations for each table ++ TableRef tableRef = tableRefIterator.next(); ++ Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef); ++ if (valuesMap == null) { ++ continue; ++ } ++ PTable table = tableRef.getTable(); ++ // Track tables to which we've sent uncommitted data ++ if (table.isTransactional()) { ++ uncommittedPhysicalNames.add(table.getPhysicalName().getString()); ++ } ++ table.getIndexMaintainers(indexMetaDataPtr, connection); ++ boolean isDataTable = true; ++ // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) ++ long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; ++ Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); ++ while (mutationsIterator.hasNext()) { ++ Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); ++ byte[] htableName = pair.getFirst(); ++ List<Mutation> mutations = pair.getSecond(); ++ ++ //create a span per target table ++ //TODO maybe we can be smarter about the table name to string here? ++ Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); ++ ++ int retryCount = 0; ++ boolean shouldRetry = false; ++ do { ++ ServerCache cache = null; ++ if (isDataTable) { ++ cache = setMetaDataOnMutations(tableRef, mutations, indexMetaDataPtr); ++ } ++ ++ // If we haven't retried yet, retry for this case only, as it's possible that ++ // a split will occur after we send the index metadata cache to all known ++ // region servers. ++ shouldRetry = cache != null; ++ SQLException sqlE = null; ++ HTableInterface hTable = connection.getQueryServices().getTable(htableName); ++ try { ++ if (table.isTransactional()) { ++ // If we have indexes, wrap the HTable in a delegate HTable that ++ // will attach the necessary index meta data in the event of a ++ // rollback ++ if (!table.getIndexes().isEmpty()) { ++ hTable = new MetaDataAwareHTable(hTable, tableRef); ++ } ++ TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); ++ // Don't add immutable indexes (those are the only ones that would participate ++ // during a commit), as we don't need conflict detection for these. ++ if (isDataTable) { ++ // Even for immutable, we need to do this so that an abort has the state ++ // necessary to generate the rows to delete. ++ addTransactionParticipant(txnAware); ++ } else { ++ txnAware.startTx(getTransaction()); ++ } ++ hTable = txnAware; ++ } ++ long numMutations = mutations.size(); + GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); + + long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutations); - child.stop(); ++ child.addTimelineAnnotation("Attempt " + retryCount);; ++ hTable.batch(mutations); ++ child.stop(); ++ child.stop(); + shouldRetry = false; + long mutationCommitTime = System.currentTimeMillis() - startTime; + GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); + + long mutationSizeBytes = calculateMutationSize(mutations); + MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); + mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); ++ } catch (Exception e) { ++ SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); ++ if (inferredE != null) { ++ if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { ++ // Swallow this exception once, as it's possible that we split after sending the index metadata ++ // and one of the region servers doesn't have it. This will cause it to have it the next go around. ++ // If it fails again, we don't retry. ++ String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; ++ logger.warn(LogUtil.addCustomAnnotations(msg, connection)); ++ connection.getQueryServices().clearTableRegionCache(htableName); ++ ++ // add a new child span as this one failed ++ child.addTimelineAnnotation(msg); ++ child.stop(); ++ child = Tracing.child(span,"Failed batch, attempting retry"); ++ ++ continue; ++ } ++ e = inferredE; ++ } ++ // Throw to client with both what was committed so far and what is left to be committed. ++ // That way, client can either undo what was done or try again with what was not done. ++ sqlE = new CommitException(e, getUncommittedStatementIndexes()); ++ } finally { ++ try { ++ if (cache != null) { ++ cache.close(); ++ } ++ } finally { ++ try { ++ hTable.close(); ++ } ++ catch (IOException e) { ++ if (sqlE != null) { ++ sqlE.setNextException(ServerUtil.parseServerException(e)); ++ } else { ++ sqlE = ServerUtil.parseServerException(e); ++ } ++ } ++ if (sqlE != null) { ++ throw sqlE; ++ } ++ } ++ } ++ } while (shouldRetry && retryCount++ < 1); ++ isDataTable = false; ++ } ++ if (tableRef.getTable().getType() != PTableType.INDEX) { ++ numRows -= valuesMap.size(); ++ } ++ // Remove batches as we process them ++ if (sendAll) { ++ tableRefIterator.remove(); // Iterating through actual map in this case ++ } else { ++ mutations.remove(tableRef); ++ } ++ } + } - trace.close(); + // Note that we cannot assume that *all* mutations have been sent, since we've optimized this + // now to only send the mutations for the tables we're querying, hence we've removed the + // assertions that we're here before. + } - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); + public byte[] encodeTransaction() throws SQLException { + try { + return CODEC.encode(getTransaction()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { + return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); + } - continue; - } - e = inferredE; - } - sqlE = new CommitException(e, getUncommittedStatementIndexes()); - } finally { - try { - hTable.close(); - } catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - if (sqlE != null) { - throw sqlE; - } - } - } - } - } while (shouldRetry && retryCount++ < 1); - isDataTable = false; - } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= entry.getValue().size(); + private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, + ImmutableBytesWritable indexMetaDataPtr) throws SQLException { + PTable table = tableRef.getTable(); + byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); + ServerCache cache = null; + byte[] attribValue = null; + byte[] uuidValue = null; + byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; + if (table.isTransactional()) { + txState = encodeTransaction(); + } + boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; + if (hasIndexMetaData) { + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); + uuidValue = cache.getId(); + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + uuidValue = ServerCacheClient.generateId(); + } + } else if (txState.length == 0) { + return null; + } + // Either set the UUID to be able to access the index metadata from the cache + // or set the index metadata directly on the Mutation + for (Mutation mutation : mutations) { + if (tenantId != null) { + mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + if (attribValue != null) { + mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + if (txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } - iterator.remove(); // Remove batches as we process them + } else if (!hasIndexMetaData && txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } } - assert(numRows==0); - assert(this.mutations.isEmpty()); + return cache; } - public void rollback(PhoenixConnection connection) throws SQLException { + public void clear() throws SQLException { this.mutations.clear(); numRows = 0; } @@@ -853,95 -570,91 +940,182 @@@ @Override public void close() throws SQLException { } + + private void reset() { + txStarted = false; + tx = null; + uncommittedPhysicalNames.clear(); + } + + public void rollback() throws SQLException { + clear(); + txAwares.clear(); + if (txContext != null) { + try { + if (txStarted) { + txContext.abort(); + } + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } finally { + reset(); + } + } + } + + public void commit() throws SQLException { + boolean sendMutationsFailed=false; + try { + send(); + } catch (Throwable t) { + sendMutationsFailed=true; + throw t; + } finally { + txAwares.clear(); + if (txContext != null) { + try { + if (txStarted && !sendMutationsFailed) { + txContext.finish(); + } + } catch (TransactionFailureException e) { + try { + txContext.abort(e); + throw TransactionUtil.getSQLException(e); + } catch (TransactionFailureException e1) { + throw TransactionUtil.getSQLException(e); + } + } finally { + if (!sendMutationsFailed) { + reset(); + } + } + } + } + } + + /** + * Send mutations to hbase, so they are visible to subsequent reads, + * starting a transaction if transactional and one has not yet been started. + * @param tableRefs + * @return true if at least partially transactional and false otherwise. + * @throws SQLException + */ + public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException { + Transaction currentTx = getTransaction(); + if (currentTx != null) { + // Initialize visibility so that transactions see their own writes. + // The checkpoint() method will set it to not see writes if necessary. + currentTx.setVisibility(VisibilityLevel.SNAPSHOT); + } + Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){ + @Override + public boolean apply(TableRef tableRef) { + return tableRef.getTable().isTransactional(); + } + }); + if (filteredTableRefs.hasNext()) { + // FIXME: strip table alias to prevent equality check from failing due to alias mismatch on null alias. + // We really should be keying the tables based on the physical table name. + List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size()); + while (filteredTableRefs.hasNext()) { + TableRef tableRef = filteredTableRefs.next(); + strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); + } + startTransaction(); + send(strippedAliases.iterator()); + return true; + } + return false; + } + + public void send() throws SQLException { + send(null); + } + + public static int[] joinSortedIntArrays(int[] a, int[] b) { + int[] result = new int[a.length + b.length]; + int i = 0, j = 0, k = 0, current; + while (i < a.length && j < b.length) { + current = a[i] < b[j] ? a[i++] : b[j++]; + for ( ; i < a.length && a[i] == current; i++); + for ( ; j < b.length && b[j] == current; j++); + result[k++] = current; + } + while (i < a.length) { + for (current = a[i++] ; i < a.length && a[i] == current; i++); + result[k++] = current; + } + while (j < b.length) { + for (current = b[j++] ; j < b.length && b[j] == current; j++); + result[k++] = current; + } + return Arrays.copyOf(result, k); + } + + @Immutable + public static class RowTimestampColInfo { + private final boolean useServerTimestamp; + private final Long rowTimestamp; + + public static final RowTimestampColInfo NULL_ROWTIMESTAMP_INFO = new RowTimestampColInfo(false, null); + + public RowTimestampColInfo(boolean autoGenerate, Long value) { + this.useServerTimestamp = autoGenerate; + this.rowTimestamp = value; + } + + public boolean useServerTimestamp() { + return useServerTimestamp; + } + + public Long getTimestamp() { + return rowTimestamp; + } + } + + public static class RowMutationState { + @Nonnull private Map<PColumn,byte[]> columnValues; + private int[] statementIndexes; + @Nonnull private final RowTimestampColInfo rowTsColInfo; + + public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo) { + checkNotNull(columnValues); + checkNotNull(rowTsColInfo); + this.columnValues = columnValues; + this.statementIndexes = new int[] {statementIndex}; + this.rowTsColInfo = rowTsColInfo; + } + + Map<PColumn, byte[]> getColumnValues() { + return columnValues; + } + + int[] getStatementIndexes() { + return statementIndexes; + } + + void join(RowMutationState newRow) { + getColumnValues().putAll(newRow.getColumnValues()); + statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); + } + + @Nonnull + RowTimestampColInfo getRowTimestampColInfo() { + return rowTsColInfo; + } + + } + + public ReadMetricQueue getReadMetricQueue() { + return readMetricQueue; + } + + public void setReadMetricQueue(ReadMetricQueue readMetricQueue) { + this.readMetricQueue = readMetricQueue; + } + + public MutationMetricQueue getMutationMetricQueue() { + return mutationMetricQueue; + } + }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index e82934a,297b6cc..a5c4043 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@@ -45,10 -44,11 +45,12 @@@ import org.apache.phoenix.exception.SQL import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.expression.Expression; + import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.MappedByteBufferQueue; + import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.query.KeyRange; @@@ -84,7 -83,7 +86,8 @@@ public class SortMergeJoinPlan implemen private final KeyValueSchema rhsSchema; private final int rhsFieldPosition; private final boolean isSingleValueOnly; + private final Set<TableRef> tableRefs; + private final int thresholdBytes; public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions, @@@ -103,17 -102,10 +106,18 @@@ this.rhsSchema = buildSchema(rhsTable); this.rhsFieldPosition = rhsFieldPosition; this.isSingleValueOnly = isSingleValueOnly; + this.tableRefs = Sets.newHashSetWithExpectedSize(lhsPlan.getSourceRefs().size() + rhsPlan.getSourceRefs().size()); + this.tableRefs.addAll(lhsPlan.getSourceRefs()); + this.tableRefs.addAll(rhsPlan.getSourceRefs()); + this.thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( + QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); } - + @Override + public Operation getOperation() { + return statement.getOperation(); + } + private static KeyValueSchema buildSchema(PTable table) { KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); if (table != null) { @@@ -644,11 -639,11 +651,16 @@@ } } + + @Override + public boolean useRoundRobinIterator() { + return false; + } + @Override + public Set<TableRef> getSourceRefs() { + return tableRefs; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index 0000000,53745fe..c0035ff mode 000000,100644..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@@ -1,0 -1,201 +1,220 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.execute; + + import java.sql.ParameterMetaData; + import java.sql.SQLException; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; ++import java.util.Set; + + import org.apache.hadoop.hbase.client.Scan; + import org.apache.phoenix.compile.ExplainPlan; + import org.apache.phoenix.compile.GroupByCompiler.GroupBy; + import org.apache.phoenix.compile.OrderByCompiler.OrderBy; + import org.apache.phoenix.compile.QueryPlan; + import org.apache.phoenix.compile.RowProjector; + import org.apache.phoenix.compile.ScanRanges; + import org.apache.phoenix.compile.StatementContext; + import org.apache.phoenix.iterate.ConcatResultIterator; + import org.apache.phoenix.iterate.LimitingResultIterator; + import org.apache.phoenix.iterate.MergeSortTopNResultIterator; + import org.apache.phoenix.iterate.ParallelScanGrouper; + import org.apache.phoenix.iterate.ResultIterator; + import org.apache.phoenix.iterate.UnionResultIterators; ++import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + import org.apache.phoenix.parse.FilterableStatement; + import org.apache.phoenix.query.KeyRange; + import org.apache.phoenix.schema.TableRef; + import org.apache.phoenix.util.SQLCloseable; + ++import com.google.common.collect.Sets; ++ + + public class UnionPlan implements QueryPlan { + private static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K + + private final TableRef tableRef; + private final FilterableStatement statement; + private final ParameterMetaData paramMetaData; + private final OrderBy orderBy; + private final StatementContext parentContext; + private final Integer limit; + private final GroupBy groupBy; + private final RowProjector projector; + private final boolean isDegenerate; + private final List<QueryPlan> plans; + private UnionResultIterators iterators; + + public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, + Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException { + this.parentContext = context; + this.statement = statement; + this.tableRef = table; + this.projector = projector; + this.limit = limit; + this.orderBy = orderBy; + this.groupBy = groupBy; + this.plans = plans; + this.paramMetaData = paramMetaData; + boolean isDegen = true; + for (QueryPlan plan : plans) { + if (plan.getContext().getScanRanges() != ScanRanges.NOTHING) { + isDegen = false; + break; + } + } + this.isDegenerate = isDegen; + } + + @Override + public boolean isDegenerate() { + return isDegenerate; + } + + @Override + public List<KeyRange> getSplits() { + if (iterators == null) + return null; + return iterators.getSplits(); + } + + @Override + public List<List<Scan>> getScans() { + if (iterators == null) + return null; + return iterators.getScans(); + } + + @Override + public GroupBy getGroupBy() { + return groupBy; + } + + @Override + public OrderBy getOrderBy() { + return orderBy; + } + + @Override + public TableRef getTableRef() { + return tableRef; + } + + @Override + public Integer getLimit() { + return limit; + } + + @Override + public RowProjector getProjector() { + return projector; + } + + @Override + public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { + return iterator(Collections.<SQLCloseable>emptyList()); + } + + @Override + public final ResultIterator iterator() throws SQLException { + return iterator(Collections.<SQLCloseable>emptyList()); + } + + public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException { + this.iterators = new UnionResultIterators(plans, parentContext); + ResultIterator scanner; + boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); + + if (isOrdered) { // TopN + scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions()); + } else { + scanner = new ConcatResultIterator(iterators); + if (limit != null) { + scanner = new LimitingResultIterator(scanner, limit); + } + } + return scanner; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> steps = new ArrayList<String>(); + steps.add("UNION ALL OVER " + this.plans.size() + " QUERIES"); + ResultIterator iterator = iterator(); + iterator.explain(steps); + // Indent plans steps nested under union, except last client-side merge/concat step (if there is one) + int offset = !orderBy.getOrderByExpressions().isEmpty() || limit != null ? 1 : 0; + for (int i = 1 ; i < steps.size()-offset; i++) { + steps.set(i, " " + steps.get(i)); + } + return new ExplainPlan(steps); + } + + + @Override + public long getEstimatedSize() { + return DEFAULT_ESTIMATED_SIZE; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return paramMetaData; + } + + @Override + public FilterableStatement getStatement() { + return statement; + } + + @Override + public StatementContext getContext() { + return parentContext; + } + + @Override + public boolean isRowKeyOrdered() { + return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving(); + } + + public List<QueryPlan> getPlans() { + return this.plans; + } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return false; + } ++ ++ @Override ++ public Operation getOperation() { ++ return statement.getOperation(); ++ } ++ ++ @Override ++ public Set<TableRef> getSourceRefs() { ++ // TODO is this correct? ++ Set<TableRef> sources = Sets.newHashSetWithExpectedSize(plans.size()); ++ for (QueryPlan plan : plans) { ++ sources.addAll(plan.getSourceRefs()); ++ } ++ return sources; ++ } + } + http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ----------------------------------------------------------------------
