PHOENIX-3825 Mutable Index rebuild does not write an index version for each data row version (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d81d9e0d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d81d9e0d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d81d9e0d Branch: refs/heads/4.x-HBase-0.98 Commit: d81d9e0d498b06fcc827ef2b43b1cddd61e812ae Parents: cd5ab4f Author: James Taylor <[email protected]> Authored: Thu May 11 17:02:26 2017 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 11 18:05:50 2017 -0700 ---------------------------------------------------------------------- .../hbase/index/covered/IndexMetaData.java | 9 +++++++- .../hbase/index/covered/NonTxIndexBuilder.java | 2 +- .../covered/example/CoveredColumnIndexer.java | 2 +- .../covered/update/IndexUpdateManager.java | 14 ++++++++++++- .../index/covered/TestNonTxIndexBuilder.java | 22 ++++++++++++++++++-- .../covered/update/TestIndexUpdateManager.java | 19 ++++++++++++----- 6 files changed, 57 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81d9e0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java index 5420013..04e2523 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java @@ -23,7 +23,14 @@ public interface IndexMetaData { @Override public boolean isImmutableRows() { return false; + } + + @Override + public boolean ignoreNewerMutations() { + return false; }}; - + public boolean isImmutableRows(); + + public boolean ignoreNewerMutations(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81d9e0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index e5cebbf..a308a5d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -66,7 +66,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { // create a state manager, so we can manage each batch LocalTableState state = new LocalTableState(env, localTable, mutation); // build the index updates for each group - IndexUpdateManager manager = new IndexUpdateManager(); + IndexUpdateManager manager = new IndexUpdateManager(indexMetaData); batchMutationAndAddUpdates(manager, state, mutation, indexMetaData); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81d9e0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java index f4a9af9..718a15e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java @@ -122,7 +122,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder { public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException { // stores all the return values - IndexUpdateManager updateMap = new IndexUpdateManager(); + IndexUpdateManager updateMap = new IndexUpdateManager(indexMetaData); // batch the updates by row to make life easier and ordered Collection<Batch> batches = batchByRow(filtered); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81d9e0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java index 5f6020a..2784f0b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import com.google.common.collect.Lists; @@ -99,6 +100,12 @@ public class IndexUpdateManager { protected final Map<ImmutableBytesPtr, Collection<Mutation>> map = new HashMap<ImmutableBytesPtr, Collection<Mutation>>(); + private IndexMetaData indexMetaData; + + public IndexUpdateManager(IndexMetaData indexMetaData) { + this.indexMetaData = indexMetaData; + + } /** * Add an index update. Keeps the latest {@link Put} for a given timestamp @@ -113,7 +120,12 @@ public class IndexUpdateManager { updates = new TreeSet<Mutation>(COMPARATOR); map.put(key, updates); } - fixUpCurrentUpdates(updates, m); + if (indexMetaData.ignoreNewerMutations()) { + // if we're replaying mutations, we don't need to worry about out-of-order updates + updates.add(m); + } else { + fixUpCurrentUpdates(updates, m); + } } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81d9e0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java index 5cfe617..795f89d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java @@ -98,6 +98,7 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest { private static final byte[] VALUE_1 = Bytes.toBytes(111); private static final byte[] VALUE_2 = Bytes.toBytes(222); private static final byte[] VALUE_3 = Bytes.toBytes(333); + private static final byte[] VALUE_4 = Bytes.toBytes(444); private static final byte PUT_TYPE = KeyValue.Type.Put.getCode(); private NonTxIndexBuilder indexBuilder; @@ -217,6 +218,8 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest { /** * Tests a partial rebuild of a row with multiple versions. 3 versions of the row in data table, * and we rebuild the index starting from time t=2 + * + * There should be one index row version per data row version. */ @Test public void testRebuildMultipleVersionRow() throws IOException { @@ -229,11 +232,15 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest { Cell currentCell1 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 1, PUT_TYPE, VALUE_1); Cell currentCell2 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 2, PUT_TYPE, VALUE_2); Cell currentCell3 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 3, PUT_TYPE, VALUE_3); - setCurrentRowState(Arrays.asList(currentCell3, currentCell2, currentCell1)); + Cell currentCell4 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 4, PUT_TYPE, VALUE_4); + setCurrentRowState(Arrays.asList(currentCell4, currentCell3, currentCell2, currentCell1)); // rebuilder replays mutations starting from t=2 MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW)); Put put = new Put(ROW); + put.addImmutable(FAM, INDEXED_QUALIFIER, 4, VALUE_4); + mutation.addAll(put); + put = new Put(ROW); put.addImmutable(FAM, INDEXED_QUALIFIER, 3, VALUE_3); mutation.addAll(put); put = new Put(ROW); @@ -242,11 +249,22 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest { Collection<Pair<Mutation, byte[]>> indexUpdates = indexBuilder.getIndexUpdate(mutation, mockIndexMetaData); - assertEquals(2, indexUpdates.size()); + // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index + // rows for VALUE_2, VALUE_3) + assertEquals(6, indexUpdates.size()); + assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM, new byte[0] /* qual not needed */, 2); assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW, + KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 2); + assertContains(indexUpdates, 3, ROW, KeyValue.Type.DeleteFamily, FAM, + new byte[0] /* qual not needed */, 3); + assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW, KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 3); + assertContains(indexUpdates, 4, ROW, KeyValue.Type.DeleteFamily, FAM, + new byte[0] /* qual not needed */, 4); + assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW, + KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 4); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81d9e0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java index a0592f3..9e50615 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.junit.Before; import org.junit.Test; - +import org.mockito.Mockito; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; public class TestIndexUpdateManager { @@ -39,10 +41,17 @@ public class TestIndexUpdateManager { private static final byte[] row = Bytes.toBytes("row"); private static final String TABLE_NAME = "table"; private static final byte[] table = Bytes.toBytes(TABLE_NAME); + private IndexMetaData mockIndexMetaData; + + @Before + public void setup() { + mockIndexMetaData = Mockito.mock(IndexMetaData.class); + Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(false); + } @Test public void testMutationComparator() throws Exception { - IndexUpdateManager manager = new IndexUpdateManager(); + IndexUpdateManager manager = new IndexUpdateManager(mockIndexMetaData); Comparator<Mutation> comparator = manager.COMPARATOR; Put p = new Put(row, 10); // lexigraphically earlier should sort earlier @@ -79,7 +88,7 @@ public class TestIndexUpdateManager { */ @Test public void testCancelingUpdates() throws Exception { - IndexUpdateManager manager = new IndexUpdateManager(); + IndexUpdateManager manager = new IndexUpdateManager(mockIndexMetaData); long ts1 = 10, ts2 = 11; // at different timestamps, so both should be retained @@ -106,13 +115,13 @@ public class TestIndexUpdateManager { validate(manager, pending); // if there is just a put and a delete at the same ts, no pending updates should be returned - manager = new IndexUpdateManager(); + manager = new IndexUpdateManager(mockIndexMetaData); manager.addIndexUpdate(table, d2); manager.addIndexUpdate(table, p); validate(manager, Collections.<Mutation> emptyList()); // different row insertions can be tricky too, if you don't get the base cases right - manager = new IndexUpdateManager(); + manager = new IndexUpdateManager(mockIndexMetaData); manager.addIndexUpdate(table, p); // this row definitely sorts after the current row byte[] row1 = Bytes.toBytes("row1");
