Repository: phoenix Updated Branches: refs/heads/txn bb7c74df8 -> d0572f214
PHOENIX-2375 Prevent write of Tephra delete markers in preDelete to ensure current data state hasn't change Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d0572f21 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d0572f21 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d0572f21 Branch: refs/heads/txn Commit: d0572f214ea96fb82b888e15ec71dbc9f3141e1e Parents: bb7c74d Author: James Taylor <[email protected]> Authored: Fri Nov 6 17:09:10 2015 -0800 Committer: James Taylor <[email protected]> Committed: Fri Nov 6 17:09:10 2015 -0800 ---------------------------------------------------------------------- .../apache/phoenix/end2end/index/IndexIT.java | 2 ++ .../index/PhoenixTransactionalIndexer.java | 34 +++++++++++--------- .../org/apache/phoenix/schema/PTableImpl.java | 21 +++++++++--- 3 files changed, 37 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0572f21/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java index 3738e5b..d2d5d21 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java @@ -33,6 +33,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -126,6 +127,7 @@ public class IndexIT extends BaseHBaseManagedTimeIT { } @Test + @Ignore("Failing due to zero byte incorrectly being stripped from row key") // FIXME: fixed in master, so remove this ignore tag when merged. public void testDeleteFromAllPKColumnIndex() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0572f21/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 40ee817..68c1539 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -21,6 +21,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase98.TransactionAwareHTable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; @@ -67,11 +72,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; -import co.cask.tephra.Transaction; -import co.cask.tephra.Transaction.VisibilityLevel; -import co.cask.tephra.TxConstants; -import co.cask.tephra.hbase98.TransactionAwareHTable; - /** * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a @@ -232,14 +232,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { if (isRollback) { processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); } else { - processScanner(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); - for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); - // if we did not generate valid put, we might have to generate a delete - if (!generatePuts(indexMetaData, indexUpdates, state)) { - generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); - } - } + processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); } } finally { if (txTable != null) txTable.close(); @@ -248,7 +241,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return indexUpdates; } - private void processScanner(RegionCoprocessorEnvironment env, + private void processMutation(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, @@ -257,6 +250,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { if (scanner != null) { Result result; ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); @@ -264,6 +258,11 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { generatePuts(indexMetaData, indexUpdates, state); } } + // Process new data table by adding new index rows + for (Mutation m : mutations.values()) { + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); + generatePuts(indexMetaData, indexUpdates, state); + } } private void processRollback(RegionCoprocessorEnvironment env, @@ -274,10 +273,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { if (scanner != null) { Result result; + // Loop through last committed row state plus all new rows associated with current transaction + // to generate point delete markers for all index rows that were added. We don't have Tephra + // manage index rows in change sets because we don't want to be hit with the additional + // memory hit and do not need to do conflict detection on index rows. ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); - // Sort by timestamp, type, cf, cq so we can process in time batches + // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest + // (as if we're "replaying" them in time order). List<Cell> cells = result.listCells(); Collections.sort(cells, new Comparator<Cell>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d0572f21/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 2d2d593..f6613f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -32,6 +32,8 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import co.cask.tephra.TxConstants; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; @@ -616,7 +618,7 @@ public class PTableImpl implements PTable { private Put setValues; private Delete unsetValues; - private Delete deleteRow; + private Mutation deleteRow; private final long ts; public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) { @@ -720,11 +722,20 @@ public class PTableImpl implements PTable { @Override public void delete() { newMutations(); - Delete delete = new Delete(key); - for (PColumnFamily colFamily : families) { - delete.deleteFamily(colFamily.getName().getBytes(), ts); + if (PTableImpl.this.isTransactional()) { + Put delete = new Put(key); + for (PColumnFamily colFamily : families) { + delete.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + HConstants.EMPTY_BYTE_ARRAY); + } + deleteRow = delete; + } else { + Delete delete = new Delete(key); + for (PColumnFamily colFamily : families) { + delete.deleteFamily(colFamily.getName().getBytes(), ts); + } + deleteRow = delete; } - deleteRow = delete; // No need to write to the WAL for indexes if (PTableImpl.this.getType() == PTableType.INDEX) { deleteRow.setDurability(Durability.SKIP_WAL);
