http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index d4ef1cf..3444054 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -29,26 +29,33 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.htrace.Span; import org.apache.htrace.TraceScope; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.monitoring.MutationMetricQueue; import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric; @@ -58,9 +65,9 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableRef; @@ -72,14 +79,25 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; -import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionCodec; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.hbase11.TransactionAwareHTable; + +import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * @@ -90,44 +108,217 @@ import com.google.common.collect.Maps; */ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); - + private static final TransactionCodec CODEC = new TransactionCodec(); + private PhoenixConnection connection; private final long maxSize; - private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; + private final List<TransactionAware> txAwares; + private final TransactionContext txContext; + private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); + + private Transaction tx; private long sizeOffset; private int numRows = 0; + private boolean txStarted = false; + private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; - MutationState(long maxSize, PhoenixConnection connection, - Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { - this.maxSize = maxSize; - this.connection = connection; - this.mutations = mutations; - boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); - this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() - : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; - } - public MutationState(long maxSize, PhoenixConnection connection) { - this(maxSize,connection,0); + this(maxSize,connection, null); + } + + public MutationState(MutationState mutationState) { + this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction()); } public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize())); + this(maxSize, connection, null, sizeOffset); + } + + private MutationState(long maxSize, PhoenixConnection connection, Transaction tx) { + this(maxSize,connection, tx, 0); + } + + private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) { + this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx); this.sizeOffset = sizeOffset; } + MutationState(long maxSize, PhoenixConnection connection, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, + Transaction tx) { + this.maxSize = maxSize; + this.connection = connection; + this.mutations = mutations; + boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); + this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() + : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; + this.tx = tx; + if (tx == null) { + this.txAwares = Collections.emptyList(); + TransactionSystemClient txServiceClient = this.connection + .getQueryServices().getTransactionSystemClient(); + this.txContext = new TransactionContext(txServiceClient); + } else { + txAwares = Lists.newArrayList(); + txContext = null; + } + } + public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, sizeOffset); + this(maxSize, connection, null, sizeOffset); this.mutations.put(table, mutations); this.numRows = mutations.size(); + this.tx = connection.getMutationState().getTransaction(); throwIfTooBig(); } + public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { + Transaction currentTx = getTransaction(); + if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { + return false; + } + Set<TableRef> sources = plan.getSourceRefs(); + if (sources.isEmpty()) { + return false; + } + // For a DELETE statement, we're always querying the table being deleted from. This isn't + // a problem, but it potentially could be if there are other references to the same table + // nested in the DELETE statement (as a sub query or join, for example). + TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null; + boolean excludeCurrent = false; + String targetPhysicalName = plan.getTargetRef().getTable().getPhysicalName().getString(); + for (TableRef source : sources) { + if (source.getTable().isTransactional() && !source.equals(ignoreForExcludeCurrent)) { + String sourcePhysicalName = source.getTable().getPhysicalName().getString(); + if (targetPhysicalName.equals(sourcePhysicalName)) { + excludeCurrent = true; + break; + } + } + } + // If we're querying the same table we're updating, we must exclude our writes to + // it from being visible. + if (excludeCurrent) { + // If any source tables have uncommitted data prior to last checkpoint, + // then we must create a new checkpoint. + boolean hasUncommittedData = false; + for (TableRef source : sources) { + String sourcePhysicalName = source.getTable().getPhysicalName().getString(); + if (source.getTable().isTransactional() && uncommittedPhysicalNames.contains(sourcePhysicalName)) { + hasUncommittedData = true; + break; + } + } + if (hasUncommittedData) { + try { + if (txContext == null) { + currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); + } else { + txContext.checkpoint(); + currentTx = tx = txContext.getCurrentTransaction(); + } + // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards + // should see all this data. + uncommittedPhysicalNames.clear(); + } catch (TransactionFailureException e) { + throw new SQLException(e); + } + } + // Since we're querying our own table while mutating it, we must exclude + // see our current mutations, otherwise we can get erroneous results (for DELETE) + // or get into an infinite loop (for UPSERT SELECT). + currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + return true; + } + return false; + } + + private void addTransactionParticipant(TransactionAware txAware) throws SQLException { + if (txContext == null) { + txAwares.add(txAware); + assert(tx != null); + txAware.startTx(tx); + } else { + txContext.addTransactionAware(txAware); + } + } + + // Though MutationState is not thread safe in general, this method should be because it may + // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose + // the Transaction outside of MutationState, this seems reasonable, as the member variables + // would not change as these threads are running. + public HTableInterface getHTable(PTable table) throws SQLException { + HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); + Transaction currentTx; + if (table.isTransactional() && (currentTx=getTransaction()) != null) { + TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table); + // Using cloned mutationState as we may have started a new transaction already + // if auto commit is true and we need to use the original one here. + txAware.startTx(currentTx); + htable = txAware; + } + return htable; + } + + public PhoenixConnection getConnection() { + return connection; + } + + // Kept private as the Transaction may change when check pointed. Keeping it private ensures + // no one holds on to a stale copy. + private Transaction getTransaction() { + return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; + } + + public boolean isTransactionStarted() { + return getTransaction() != null; + } + + public long getReadPointer() { + Transaction tx = getTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getReadPointer(); + } + + // For testing + public long getWritePointer() { + Transaction tx = getTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); + } + + // For testing + public VisibilityLevel getVisibilityLevel() { + Transaction tx = getTransaction(); + return tx == null ? null : tx.getVisibilityLevel(); + } + + public boolean startTransaction() throws SQLException { + if (txContext == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); + } + + if (connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) + .build().buildException(); + } + + try { + if (!txStarted) { + txContext.start(); + txStarted = true; + return true; + } + } catch (TransactionFailureException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException(); + } + return false; + } + public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap()); + MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null); state.sizeOffset = 0; return state; } @@ -153,6 +344,13 @@ public class MutationState implements SQLCloseable { if (this == newMutationState) { // Doesn't make sense return; } + if (txContext != null) { + for (TransactionAware txAware : newMutationState.txAwares) { + txContext.addTransactionAware(txAware); + } + } else { + txAwares.addAll(newMutationState.txAwares); + } this.sizeOffset += newMutationState.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) { @@ -202,7 +400,8 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { + + private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { RowKeySchema schema = table.getRowKeySchema(); int rowTimestampColPos = table.getRowTimestampColPos(); Field rowTimestampField = schema.getField(rowTimestampColPos); @@ -223,51 +422,15 @@ public class MutationState implements SQLCloseable { return ptr; } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) { + private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); - boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism (table.isImmutableRows() || includeMutableIndexes) ? IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : Iterators.<PTable>emptyIterator(); - final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); + final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; - Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator(); - long timestampToUse = timestamp; - while (iterator.hasNext()) { - Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next(); - ImmutableBytesPtr key = rowEntry.getKey(); - RowMutationState state = rowEntry.getValue(); - if (tableWithRowTimestampCol) { - RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); - if (rowTsColInfo.useServerTimestamp()) { - // regenerate the key with this timestamp. - key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); - } else { - if (rowTsColInfo.getTimestamp() != null) { - timestampToUse = rowTsColInfo.getTimestamp(); - } - } - } - PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key); - List<Mutation> rowMutations, rowMutationsPertainingToIndex; - if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete - row.delete(); - rowMutations = row.toRowMutations(); - // Row deletes for index tables are processed by running a re-written query - // against the index table (as this allows for flexibility in being able to - // delete rows). - rowMutationsPertainingToIndex = Collections.emptyList(); - } else { - for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { - row.setValue(valueEntry.getKey(), valueEntry.getValue()); - } - rowMutations = row.toRowMutations(); - rowMutationsPertainingToIndex = rowMutations; - } - mutations.addAll(rowMutations); - if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); - } + generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex); return new Iterator<Pair<byte[],List<Mutation>>>() { boolean isFirst = true; @@ -280,14 +443,24 @@ public class MutationState implements SQLCloseable { public Pair<byte[], List<Mutation>> next() { if (isFirst) { isFirst = false; - return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(),mutations); + return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(), mutationList); } PTable index = indexes.next(); List<Mutation> indexMutations; try { indexMutations = IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex, - tempPtr, connection.getKeyValueBuilder(), connection); + connection.getKeyValueBuilder(), connection); + // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map + if (!sendAll) { + TableRef key = new TableRef(index); + Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); + if (rowToColumnMap!=null) { + final List<Mutation> deleteMutations = Lists.newArrayList(); + generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null); + indexMutations.addAll(deleteMutations); + } + } } catch (SQLException e) { throw new IllegalDataException(e); } @@ -301,28 +474,84 @@ public class MutationState implements SQLCloseable { }; } + + private void generateMutations(final TableRef tableRef, long timestamp, + final Map<ImmutableBytesPtr, RowMutationState> values, + final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) { + final PTable table = tableRef.getTable(); + boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; + Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator = + values.entrySet().iterator(); + long timestampToUse = timestamp; + while (iterator.hasNext()) { + Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); + ImmutableBytesPtr key = rowEntry.getKey(); + RowMutationState state = rowEntry.getValue(); + if (tableWithRowTimestampCol) { + RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); + if (rowTsColInfo.useServerTimestamp()) { + // regenerate the key with this timestamp. + key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); + } else { + if (rowTsColInfo.getTimestamp() != null) { + timestampToUse = rowTsColInfo.getTimestamp(); + } + } + } + PRow row = + tableRef.getTable() + .newRow(connection.getKeyValueBuilder(), timestampToUse, key); + List<Mutation> rowMutations, rowMutationsPertainingToIndex; + if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete + row.delete(); + rowMutations = row.toRowMutations(); + // Row deletes for index tables are processed by running a re-written query + // against the index table (as this allows for flexibility in being able to + // delete rows). + rowMutationsPertainingToIndex = Collections.emptyList(); + } else { + for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues() + .entrySet()) { + row.setValue(valueEntry.getKey(), valueEntry.getValue()); + } + rowMutations = row.toRowMutations(); + rowMutationsPertainingToIndex = rowMutations; + } + mutationList.addAll(rowMutations); + if (mutationsPertainingToIndex != null) mutationsPertainingToIndex + .addAll(rowMutationsPertainingToIndex); + } + } /** * Get the unsorted list of HBase mutations for the tables with uncommitted data. * @return list of HBase mutations for uncommitted data. */ + public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) { + return toMutations(false, timestamp); + } + public Iterator<Pair<byte[],List<Mutation>>> toMutations() { - return toMutations(false); + return toMutations(false, null); } public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) { + return toMutations(includeMutableIndexes, null); + } + + public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Iterators.emptyIterator(); } Long scn = connection.getSCN(); - final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); return new Iterator<Pair<byte[],List<Mutation>>>() { private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { - return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes); + return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true); } @Override @@ -346,58 +575,62 @@ public class MutationState implements SQLCloseable { }; } - /** - * Validates that the meta data is valid against the server meta data if we haven't yet done so. - * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data - * has changed. - * @param connection - * @return the server time to use for the upsert - * @throws SQLException if the table or any columns no longer exist - */ - private long[] validate() throws SQLException { - int i = 0; - Long scn = connection.getSCN(); - MetaDataClient client = new MetaDataClient(connection); - long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { - TableRef tableRef = entry.getKey(); - long serverTimeStamp = tableRef.getTimeStamp(); - PTable table = tableRef.getTable(); - // If we're auto committing, we've already validated the schema when we got the ColumnResolver, - // so no need to do it again here. - if (!connection.getAutoCommit()) { - MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); - long timestamp = result.getMutationTime(); - if (timestamp != QueryConstants.UNSET_TIMESTAMP) { - serverTimeStamp = timestamp; - if (result.wasUpdated()) { - // TODO: use bitset? - table = result.getTable(); - PColumn[] columns = new PColumn[table.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { - RowMutationState valueEntry = rowEntry.getValue(); - if (valueEntry != null) { - Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); - if (colValues != PRow.DELETE_MARKER) { - for (PColumn column : colValues.keySet()) { - columns[column.getPosition()] = column; - } - } - } + /** + * Validates that the meta data is valid against the server meta data if we haven't yet done so. + * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data + * has changed. + * @param connection + * @return the server time to use for the upsert + * @throws SQLException if the table or any columns no longer exist + */ + private long[] validateAll() throws SQLException { + int i = 0; + long[] timeStamps = new long[this.mutations.size()]; + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { + TableRef tableRef = entry.getKey(); + timeStamps[i++] = validate(tableRef, entry.getValue()); + } + return timeStamps; + } + + private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { + Long scn = connection.getSCN(); + MetaDataClient client = new MetaDataClient(connection); + long serverTimeStamp = tableRef.getTimeStamp(); + PTable table = tableRef.getTable(); + // If we're auto committing, we've already validated the schema when we got the ColumnResolver, + // so no need to do it again here. + if (!connection.getAutoCommit()) { + MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); + long timestamp = result.getMutationTime(); + if (timestamp != QueryConstants.UNSET_TIMESTAMP) { + serverTimeStamp = timestamp; + if (result.wasUpdated()) { + // TODO: use bitset? + table = result.getTable(); + PColumn[] columns = new PColumn[table.getColumns().size()]; + for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { + RowMutationState valueEntry = rowEntry.getValue(); + if (valueEntry != null) { + Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); + if (colValues != PRow.DELETE_MARKER) { + for (PColumn column : colValues.keySet()) { + columns[column.getPosition()] = column; + } + } } - for (PColumn column : columns) { - if (column != null) { - table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); - } + } + for (PColumn column : columns) { + if (column != null) { + table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); } - tableRef.setTable(table); } + tableRef.setTable(table); } } - timeStamps[i++] = scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; } - return timeStamps; - } + return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; + } private static long calculateMutationSize(List<Mutation> mutations) { long byteSize = 0; @@ -410,149 +643,292 @@ public class MutationState implements SQLCloseable { return byteSize; } + private boolean hasKeyValueColumn(PTable table, PTable index) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + return !maintainer.getAllColumns().isEmpty(); + } + + private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) { + while (enabledImmutableIndexes.hasNext()) { + PTable index = enabledImmutableIndexes.next(); + if (index.getIndexType() != IndexType.LOCAL) { + if (hasKeyValueColumn(table, index)) { + keyValueIndexes.add(index); + } else { + rowKeyIndexes.add(index); + } + } + } + } + private class MetaDataAwareHTable extends DelegateHTable { + private final TableRef tableRef; + + private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { + super(delegate); + this.tableRef = tableRef; + } + + /** + * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an + * opportunity to attach our index meta data to the mutations such that we can also undo + * the index mutations. + */ + @Override + public void delete(List<Delete> deletes) throws IOException { + try { + PTable table = tableRef.getTable(); + List<PTable> indexes = table.getIndexes(); + Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator()); + if (enabledIndexes.hasNext()) { + List<PTable> keyValueIndexes = Collections.emptyList(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection); + if (table.isImmutableRows()) { + List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); + divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); + // Generate index deletes for immutable indexes that only reference row key + // columns and submit directly here. + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + for (PTable index : rowKeyIndexes) { + List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection); + HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); + hindex.delete(indexDeletes); + } + } + + // If we have mutable indexes, local immutable indexes, or global immutable indexes + // that reference key value columns, setup index meta data and attach here. In this + // case updates to the indexes will be generated on the server side. + // An alternative would be to let Tephra track the row keys for the immutable index + // by adding it as a transaction participant (soon we can prevent any conflict + // detection from occurring) with the downside being the additional memory required. + if (!keyValueIndexes.isEmpty()) { + attachMetaData = true; + IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); + } + if (attachMetaData) { + setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); + } + } + delegate.delete(deletes); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + @SuppressWarnings("deprecation") - public void commit() throws SQLException { + private void send(Iterator<TableRef> tableRefIterator) throws SQLException { int i = 0; - PName tenantId = connection.getTenantId(); - long[] serverTimeStamps = validate(); - Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); + long[] serverTimeStamps = null; + boolean sendAll = false; + if (tableRefIterator == null) { + serverTimeStamps = validateAll(); + tableRefIterator = mutations.keySet().iterator(); + sendAll = true; + } + // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); - while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); - // at this point we are going through mutations for each table - - Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); - // above is mutations for a table where the first part is the row key and the second part is column values. - - TableRef tableRef = entry.getKey(); - PTable table = tableRef.getTable(); - table.getIndexMaintainers(tempPtr, connection); - boolean hasIndexMaintainers = tempPtr.getLength() > 0; - boolean isDataTable = true; - long serverTimestamp = serverTimeStamps[i++]; - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); - // above returns an iterator of pair where the first - while (mutationsIterator.hasNext()) { - Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); - List<Mutation> mutations = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - do { - ServerCache cache = null; - if (hasIndexMaintainers && isDataTable) { - byte[] attribValue = null; - byte[] uuidValue; - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, tempPtr); - child.addTimelineAnnotation("Updated index metadata cache"); - uuidValue = cache.getId(); - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = true; - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr); - uuidValue = ServerCacheClient.generateId(); - } - // Either set the UUID to be able to access the index metadata from the cache - // or set the index metadata directly on the Mutation - for (Mutation mutation : mutations) { - if (tenantId != null) { - byte[] tenantIdBytes = ScanUtil.getTenantIdBytes( - table.getRowKeySchema(), - table.getBucketNum()!=null, - tenantId); - mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes); - } - mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - } - } - } - - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - long numMutations = mutations.size(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + while (tableRefIterator.hasNext()) { + // at this point we are going through mutations for each table + TableRef tableRef = tableRefIterator.next(); + Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef); + if (valuesMap == null || valuesMap.isEmpty()) { + continue; + } + PTable table = tableRef.getTable(); + // Track tables to which we've sent uncommitted data + if (table.isTransactional()) { + uncommittedPhysicalNames.add(table.getPhysicalName().getString()); + } + table.getIndexMaintainers(indexMetaDataPtr, connection); + boolean isDataTable = true; + // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) + long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; + Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); + while (mutationsIterator.hasNext()) { + Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); + byte[] htableName = pair.getFirst(); + List<Mutation> mutationList = pair.getSecond(); + + //create a span per target table + //TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + + int retryCount = 0; + boolean shouldRetry = false; + do { + ServerCache cache = null; + if (isDataTable) { + cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr); + } + + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = cache != null; + SQLException sqlE = null; + HTableInterface hTable = connection.getQueryServices().getTable(htableName); + try { + if (table.isTransactional()) { + // If we have indexes, wrap the HTable in a delegate HTable that + // will attach the necessary index meta data in the event of a + // rollback + if (!table.getIndexes().isEmpty()) { + hTable = new MetaDataAwareHTable(hTable, tableRef); + } + TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); + // Don't add immutable indexes (those are the only ones that would participate + // during a commit), as we don't need conflict detection for these. + if (isDataTable) { + // Even for immutable, we need to do this so that an abort has the state + // necessary to generate the rows to delete. + addTransactionParticipant(txnAware); + } else { + txnAware.startTx(getTransaction()); + } + hTable = txnAware; + } + long numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutations); - child.stop(); + child.addTimelineAnnotation("Attempt " + retryCount);; + hTable.batch(mutationList); + child.stop(); + child.stop(); shouldRetry = false; long mutationCommitTime = System.currentTimeMillis() - startTime; GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); - long mutationSizeBytes = calculateMutationSize(mutations); + long mutationSizeBytes = calculateMutationSize(mutationList); MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); + } catch (Exception e) { + SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); + if (inferredE != null) { + if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index metadata + // and one of the region servers doesn't have it. This will cause it to have it the next go around. + // If it fails again, we don't retry. + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + logger.warn(LogUtil.addCustomAnnotations(msg, connection)); + connection.getQueryServices().clearTableRegionCache(htableName); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span,"Failed batch, attempting retry"); + + continue; + } + e = inferredE; + } + // Throw to client with both what was committed so far and what is left to be committed. + // That way, client can either undo what was done or try again with what was not done. + sqlE = new CommitException(e, getUncommittedStatementIndexes()); + } finally { + try { + if (cache != null) { + cache.close(); + } + } finally { + try { + hTable.close(); + } + catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); + } + } + if (sqlE != null) { + // clear pending mutations + mutations.clear(); + throw sqlE; + } + } + } + } while (shouldRetry && retryCount++ < 1); + isDataTable = false; + } + if (tableRef.getTable().getType() != PTableType.INDEX) { + numRows -= valuesMap.size(); + } + // Remove batches as we process them + if (sendAll) { + tableRefIterator.remove(); // Iterating through actual map in this case + } else { + mutations.remove(tableRef); + } + } + } + // Note that we cannot assume that *all* mutations have been sent, since we've optimized this + // now to only send the mutations for the tables we're querying, hence we've removed the + // assertions that we're here before. + } - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); + public byte[] encodeTransaction() throws SQLException { + try { + return CODEC.encode(getTransaction()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { + return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); + } - continue; - } - e = inferredE; - } - sqlE = new CommitException(e, getUncommittedStatementIndexes()); - } finally { - try { - hTable.close(); - } catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - if (sqlE != null) { - throw sqlE; - } - } - } - } - } while (shouldRetry && retryCount++ < 1); - isDataTable = false; - } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= entry.getValue().size(); + private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, + ImmutableBytesWritable indexMetaDataPtr) throws SQLException { + PTable table = tableRef.getTable(); + byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); + ServerCache cache = null; + byte[] attribValue = null; + byte[] uuidValue = null; + byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; + if (table.isTransactional()) { + txState = encodeTransaction(); + } + boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; + if (hasIndexMetaData) { + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); + uuidValue = cache.getId(); + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + uuidValue = ServerCacheClient.generateId(); + } + } else if (txState.length == 0) { + return null; + } + // Either set the UUID to be able to access the index metadata from the cache + // or set the index metadata directly on the Mutation + for (Mutation mutation : mutations) { + if (tenantId != null) { + mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + if (attribValue != null) { + mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + if (txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } - iterator.remove(); // Remove batches as we process them + } else if (!hasIndexMetaData && txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } } - assert(numRows==0); - assert(this.mutations.isEmpty()); + return cache; } - public void rollback(PhoenixConnection connection) throws SQLException { + private void clear() throws SQLException { this.mutations.clear(); numRows = 0; } @@ -570,6 +946,100 @@ public class MutationState implements SQLCloseable { @Override public void close() throws SQLException { } + + private void reset() { + txStarted = false; + tx = null; + uncommittedPhysicalNames.clear(); + } + + public void rollback() throws SQLException { + clear(); + txAwares.clear(); + if (txContext != null) { + try { + if (txStarted) { + txContext.abort(); + } + } catch (TransactionFailureException e) { + throw new SQLException(e); // TODO: error code + } finally { + reset(); + } + } + } + + public void commit() throws SQLException { + boolean sendMutationsFailed=false; + try { + send(); + } catch (Throwable t) { + sendMutationsFailed=true; + throw t; + } finally { + txAwares.clear(); + if (txContext != null) { + try { + if (txStarted && !sendMutationsFailed) { + txContext.finish(); + } + } catch (TransactionFailureException e) { + try { + txContext.abort(e); + // abort and throw the original commit failure exception + throw TransactionUtil.getTransactionFailureException(e); + } catch (TransactionFailureException e1) { + // if abort fails and throw the abort failure exception + throw TransactionUtil.getTransactionFailureException(e1); + } + } finally { + if (!sendMutationsFailed) { + reset(); + } + } + } + } + } + + /** + * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a + * query. In this way, they are visible to subsequent reads but are not actually committed until + * commit is called. + * @param tableRefs + * @return true if at least partially transactional and false otherwise. + * @throws SQLException + */ + public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException { + Transaction currentTx = getTransaction(); + if (currentTx != null) { + // Initialize visibility so that transactions see their own writes. + // The checkpoint() method will set it to not see writes if necessary. + currentTx.setVisibility(VisibilityLevel.SNAPSHOT); + } + Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){ + @Override + public boolean apply(TableRef tableRef) { + return tableRef.getTable().isTransactional(); + } + }); + if (filteredTableRefs.hasNext()) { + // FIXME: strip table alias to prevent equality check from failing due to alias mismatch on null alias. + // We really should be keying the tables based on the physical table name. + List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size()); + while (filteredTableRefs.hasNext()) { + TableRef tableRef = filteredTableRefs.next(); + strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); + } + startTransaction(); + send(strippedAliases.iterator()); + return true; + } + return false; + } + + public void send() throws SQLException { + send(null); + } public static int[] joinSortedIntArrays(int[] a, int[] b) { int[] result = new int[a.length + b.length];
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 3098980..19b3e6b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -151,7 +151,7 @@ public class ScanPlan extends BaseQueryPlan { return spoolingResultIteratorFactory; } else { return new ChunkedResultIterator.ChunkedResultIteratorFactory( - spoolingResultIteratorFactory, table); + spoolingResultIteratorFactory, context.getConnection().getMutationState(), table); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index dea82a8..ab7534a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -89,6 +89,8 @@ import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; +import co.cask.tephra.TxConstants; + import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -874,7 +876,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - ) { + || (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { nDeleteCF++; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index 70ddc86..05a01b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ScanRanges; @@ -59,7 +58,7 @@ public class IndexMetaDataCacheClient { * @param mutations the list of mutations that will be sent in a batch to server * @param indexMetaDataByteLength length in bytes of the index metadata cache */ - public static boolean useIndexMetadataCache(PhoenixConnection connection, List<Mutation> mutations, int indexMetaDataByteLength) { + public static boolean useIndexMetadataCache(PhoenixConnection connection, List<? extends Mutation> mutations, int indexMetaDataByteLength) { ReadOnlyProps props = connection.getQueryServices().getProps(); int threshold = props.getInt(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD); return (indexMetaDataByteLength > ServerCacheClient.UUID_LENGTH && mutations.size() > threshold); @@ -72,25 +71,26 @@ public class IndexMetaDataCacheClient { * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr) throws SQLException { + public ServerCache addIndexMetadataCache(List<? extends Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException { /** * Serialize and compress hashCacheTable */ - return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } /** * Send the index metadata cahce to all region servers for regions that will handle the mutations. + * @param txState TODO * @return client-side {@link ServerCache} representing the added index metadata cache * @throws SQLException * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr) throws SQLException { + public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr, byte[] txState) throws SQLException { /** * Serialize and compress hashCacheTable */ - return serverCache.addServerCache(ranges, ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 488db44..7ced2f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -24,11 +24,15 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import co.cask.tephra.Transaction; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.util.TransactionUtil; public class IndexMetaDataCacheFactory implements ServerCacheFactory { public IndexMetaDataCacheFactory() { @@ -43,10 +47,16 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache (ImmutableBytesWritable cachePtr, final MemoryChunk chunk) throws SQLException { + public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast - final List<IndexMaintainer> maintainers = + final List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE); + final Transaction txn; + try { + txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null; + } catch (IOException e) { + throw new SQLException(e); + } return new IndexMetaDataCache() { @Override @@ -58,6 +68,11 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { public List<IndexMaintainer> getIndexMaintainers() { return maintainers; } + + @Override + public Transaction getTransaction() { + return txn; + } }; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 2fd168a..806a20a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -75,7 +75,7 @@ import com.google.common.collect.Multimap; * * @since 2.1 */ -public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { +public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class); private RegionCoprocessorEnvironment env; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 3dcc44e..60ae915 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -22,6 +22,8 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; +import co.cask.tephra.Transaction; + import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.GlobalCache; @@ -46,8 +48,10 @@ public class PhoenixIndexMetaData implements IndexMetaData { byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID); if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD); + byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); + final Transaction txn = MutationState.decodeTransaction(txState); return new IndexMetaDataCache() { @Override @@ -58,6 +62,11 @@ public class PhoenixIndexMetaData implements IndexMetaData { return indexMaintainers; } + @Override + public Transaction getTransaction() { + return txn; + } + }; } else { byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); @@ -80,6 +89,10 @@ public class PhoenixIndexMetaData implements IndexMetaData { this.attributes = attributes; } + public Transaction getTransaction() { + return indexMetaDataCache.getTransaction(); + } + public List<IndexMaintainer> getIndexMaintainers() { return indexMetaDataCache.getIndexMaintainers(); }