Repository: phoenix Updated Branches: refs/heads/txn c7700b41d -> 46eb25c14
Start transaction correctly for immutable indexes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/46eb25c1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/46eb25c1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/46eb25c1 Branch: refs/heads/txn Commit: 46eb25c14bb868d5fb0e9c1e363b12677822edca Parents: c7700b4 Author: James Taylor <[email protected]> Authored: Wed May 20 15:23:13 2015 -0700 Committer: James Taylor <[email protected]> Committed: Wed May 20 15:23:13 2015 -0700 ---------------------------------------------------------------------- .../end2end/index/TxImmutableIndexIT.java | 87 ++++++++++++++++++++ .../apache/phoenix/execute/MutationState.java | 43 ++++++---- .../apache/phoenix/index/IndexMaintainer.java | 5 ++ .../index/PhoenixTransactionalIndexer.java | 35 +------- .../java/org/apache/phoenix/util/IndexUtil.java | 7 +- 5 files changed, 130 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java new file mode 100644 index 0000000..4c77ea3 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class TxImmutableIndexIT extends ImmutableIndexIT { + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan + // Forces server cache to be used + props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); + // We need this b/c we don't allow a transactional table to be created if the underlying + // HBase table already exists (since we don't know if it was transactional before). + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testRollbackOfUncommittedIndexChange() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR) IMMUTABLE_ROWS=true"); + stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)"); + + stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')"); + + //assert values in data table + ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); + assertTrue(rs.next()); + assertEquals("x", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("a", rs.getString(3)); + assertFalse(rs.next()); + + conn.rollback(); + + //assert values in data table + rs = stmt.executeQuery("select v1, v2, v3 from DEMO"); + assertFalse(rs.next()); + + } finally { + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 4f1a2cd..1c3f130 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -94,7 +94,6 @@ public class MutationState implements SQLCloseable { private PhoenixConnection connection; private final long maxSize; - private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); // map from table to rows // rows - map from rowkey to columns // columns - map from column to value @@ -264,6 +263,7 @@ public class MutationState implements SQLCloseable { final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); + final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); while (iterator.hasNext()) { Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); ImmutableBytesPtr key = rowEntry.getKey(); @@ -305,7 +305,7 @@ public class MutationState implements SQLCloseable { try { indexMutations = IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex, - tempPtr, connection.getKeyValueBuilder(), connection); + ptr, connection.getKeyValueBuilder(), connection); } catch (SQLException e) { throw new IllegalDataException(e); } @@ -473,8 +473,9 @@ public class MutationState implements SQLCloseable { divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); // Generate index deletes for immutable indexes that only reference row key // columns and submit directly here. + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); for (PTable index : rowKeyIndexes) { - List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, tempPtr, connection.getKeyValueBuilder(), connection); + List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection); HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); hindex.delete(indexDeletes); } @@ -511,6 +512,7 @@ public class MutationState implements SQLCloseable { // add tracing for this operation TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); Span span = trace.getSpan(); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); while (tableRefIterator.hasNext()) { TableRef tableRef = tableRefIterator.next(); Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef); @@ -518,7 +520,7 @@ public class MutationState implements SQLCloseable { continue; } PTable table = tableRef.getTable(); - boolean hasIndexMaintainers = table.getIndexMaintainers(tempPtr, connection); + table.getIndexMaintainers(indexMetaDataPtr, connection); boolean isDataTable = true; // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; @@ -536,8 +538,8 @@ public class MutationState implements SQLCloseable { boolean shouldRetry = false; do { ServerCache cache = null; - if (hasIndexMaintainers && isDataTable) { - cache = setMetaDataOnMutations(tableRef, mutations, tempPtr); + if (isDataTable) { + cache = setMetaDataOnMutations(tableRef, mutations, indexMetaDataPtr); } // If we haven't retried yet, retry for this case only, as it's possible that @@ -558,7 +560,11 @@ public class MutationState implements SQLCloseable { // Don't add immutable indexes (those are the only ones that would participate // during a commit), as we don't need conflict detection for these. if (isDataTable) { + // Even for immutable, we need to do this so that an abort has the state + // necessary to generate the rows to delete. addTxParticipant(txnAware); + } else { + txnAware.startTx(getTransaction()); } hTable = txnAware; } @@ -635,18 +641,23 @@ public class MutationState implements SQLCloseable { byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); ServerCache cache = null; byte[] attribValue = null; - byte[] uuidValue; + byte[] uuidValue = null; byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; if (table.isTransactional()) { txState = TransactionUtil.encodeTxnState(getTransaction()); } - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); - uuidValue = cache.getId(); - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - uuidValue = ServerCacheClient.generateId(); + boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; + if (hasIndexMetaData) { + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); + uuidValue = cache.getId(); + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + uuidValue = ServerCacheClient.generateId(); + } + } else if (txState.length == 0) { + return null; } // Either set the UUID to be able to access the index metadata from the cache // or set the index metadata directly on the Mutation @@ -657,6 +668,10 @@ public class MutationState implements SQLCloseable { mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); if (attribValue != null) { mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + if (txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + } else if (!hasIndexMetaData && txState.length > 0) { mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index be668d6..aaaf685 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -200,6 +200,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); int nIndexes = nMutableIndexes + keyValueIndexes.size(); int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer + if (indexMetaDataPtr.getLength() == 0) { + estimatedSize += table.getRowKeySchema().getEstimatedByteSize(); + } for (PTable index : keyValueIndexes) { estimatedSize += index.getIndexMaintainer(table, connection).getEstimatedByteSize(); } @@ -212,6 +215,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // as its still included if (indexMetaDataPtr.getLength() > 0) { output.write(indexMetaDataPtr.get(), indexMetaDataPtr.getOffset(), indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes)); + } else { + table.getRowKeySchema().write(output); } // Serialize mutable indexes afterwards for (PTable index : keyValueIndexes) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 862c4ba..cfe0058 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -30,8 +30,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -44,7 +42,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.hbase.index.MultiMutation; @@ -69,7 +66,6 @@ import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -133,11 +129,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { Mutation m = miniBatchOp.getOperation(0); - if (!codec.isEnabled(m) || m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + if (!codec.isEnabled(m)) { super.preBatchMutate(c, miniBatchOp); return; } + boolean readOwnWrites = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null; Map<String,byte[]> updateAttributes = m.getAttributesMap(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); Collection<Pair<Mutation, byte[]>> indexUpdates = null; @@ -149,7 +146,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } // get the index updates for all elements in this batch - indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), false); + indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), readOwnWrites); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); @@ -188,32 +185,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return s; } - @Override - public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, - final WALEdit edit, final Durability durability) throws IOException { - - // Need to do this in preDelete as otherwise our scan won't see the old values unless - // we do a raw scan. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null || !codec.isEnabled(delete)) { - super.preDelete(e, delete, edit, durability); - return; - } - Map<String,byte[]> updateAttributes = delete.getAttributesMap(); - PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes); - Collection<Pair<Mutation, byte[]>> indexUpdates = null; - try { - indexUpdates = getIndexUpdates(e.getEnvironment(), indexMetaData, Iterators.<Mutation>singletonIterator(delete), true); - // no index updates, so we are done - if (!indexUpdates.isEmpty()) { - this.writer.write(indexUpdates); - } - } catch (Throwable t) { - String msg = "Failed to rollback index updates: " + indexUpdates; - LOG.error(msg, t); - ServerUtil.throwIOException(msg, t); - } - } - private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) throws IOException { ResultScanner scanner = null; TransactionAwareHTable txTable = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/46eb25c1/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index fd111e0..51428ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import co.cask.tephra.TxConstants; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; @@ -219,7 +221,10 @@ public class IndexUtil { for (final Mutation dataMutation : dataMutations) { long ts = MetaDataUtil.getClientTimeStamp(dataMutation); ptr.set(dataMutation.getRow()); - indexMutations.add(maintainer.buildDeleteMutation(kvBuilder, ptr, ts)); + Delete delete = maintainer.buildDeleteMutation(kvBuilder, ptr, ts); + // TODO: move to TransactionUtil + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY)); + indexMutations.add(delete); } return indexMutations; } catch (IOException e) {
