PHOENIX-3824 Mutable Index partial rebuild should add only one index row per updated data row.
Signed-off-by: Lars Hofhansl <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/85e344fd Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/85e344fd Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/85e344fd Branch: refs/heads/omid Commit: 85e344fdfcc65d4992336eb52868d7ba78ba55d1 Parents: a1d3c16 Author: Vincent Poon <[email protected]> Authored: Mon May 8 17:18:54 2017 -0700 Committer: Lars Hofhansl <[email protected]> Committed: Mon May 8 17:18:54 2017 -0700 ---------------------------------------------------------------------- .../hbase/index/covered/data/LocalTable.java | 22 +- .../index/covered/TestNonTxIndexBuilder.java | 317 +++++++++++++++++++ .../index/covered/data/TestLocalTable.java | 63 ++++ 3 files changed, 401 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/85e344fd/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java index 003df2a..85c54ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java @@ -33,6 +33,10 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; + /** * Wrapper around a lazily instantiated, local HTable. * <p> @@ -61,7 +65,8 @@ public class LocalTable implements LocalHBaseState { if (ignoreNewerMutations) { // Provides a means of client indicating that newer cells should not be considered, // enabling mutations to be replayed to partially rebuild the index when a write fails. - long ts = m.getFamilyCellMap().firstEntry().getValue().get(0).getTimestamp(); + // When replaying mutations we want the oldest timestamp (as anything newer we be replayed) + long ts = getOldestTimestamp(m.getFamilyCellMap().values()); s.setTimeRange(0,ts); } Region region = this.env.getRegion(); @@ -74,4 +79,19 @@ public class LocalTable implements LocalHBaseState { scanner.close(); return r; } + + // Returns the smallest timestamp in the given cell lists. + // It is assumed that the lists have cells ordered from largest to smallest timestamp + protected long getOldestTimestamp(Collection<List<Cell>> cellLists) { + Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() { + @Override + public int compare(List<Cell> left, List<Cell> right) { + // compare the last element of each list, since that is the smallest in that list + return Longs.compare(Iterables.getLast(left).getTimestamp(), + Iterables.getLast(right).getTimestamp()); + } + }; + List<Cell> minList = cellListOrdering.min(cellLists); + return Iterables.getLast(minList).getTimestamp(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/85e344fd/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java new file mode 100644 index 0000000..d4d69b4 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.BaseRegionScanner; +import org.apache.phoenix.hbase.index.MultiMutation; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.index.PhoenixIndexMetaData; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest { + private static final String TEST_TABLE_STRING = "TEST_TABLE"; + private static final String TEST_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + + TEST_TABLE_STRING + " (\n" + + " ORGANIZATION_ID CHAR(4) NOT NULL,\n" + + " ENTITY_ID CHAR(7) NOT NULL,\n" + + " SCORE INTEGER,\n" + + " LAST_UPDATE_TIME TIMESTAMP\n" + + " CONSTRAINT TEST_TABLE_PK PRIMARY KEY (\n" + + " ORGANIZATION_ID,\n" + + " ENTITY_ID\n" + + " )\n" + + ") VERSIONS=1, MULTI_TENANT=TRUE"; + private static final String TEST_TABLE_INDEX_STRING = "TEST_TABLE_SCORE"; + private static final String TEST_TABLE_INDEX_DDL = "CREATE INDEX IF NOT EXISTS " + + TEST_TABLE_INDEX_STRING + + " ON " + TEST_TABLE_STRING + " (SCORE DESC, ENTITY_ID DESC)"; + private static final byte[] ROW = Bytes.toBytes("org1entity1"); //length 4 + 7 (see ddl) + private static final String FAM_STRING = QueryConstants.DEFAULT_COLUMN_FAMILY; + private static final byte[] FAM = Bytes.toBytes(FAM_STRING); + private static final byte[] INDEXED_QUALIFIER = Bytes.toBytes("SCORE"); + private static final byte[] VALUE_1 = Bytes.toBytes(111); + private static final byte[] VALUE_2 = Bytes.toBytes(222); + private static final byte[] VALUE_3 = Bytes.toBytes(333); + private static final byte PUT_TYPE = KeyValue.Type.Put.getCode(); + + private NonTxIndexBuilder indexBuilder; + private PhoenixIndexMetaData mockIndexMetaData; + // Put your current row state in here - the index builder will read from this in LocalTable + // to determine whether the index has changed. + // Whatever we return here should match the table DDL (e.g. length of column value) + private List<Cell> currentRowCells; + + /** + * Test setup so that {@link NonTxIndexBuilder#getIndexUpdate(Mutation, IndexMetaData)} can be + * called, where any read requests to + * {@link LocalTable#getCurrentRowState(Mutation, Collection, boolean)} are read from our test + * field 'currentRowCells' + */ + @Before + public void setup() throws Exception { + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf = new Configuration(false); + conf.set(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); + Mockito.when(env.getConfiguration()).thenReturn(conf); + + // the following is used by LocalTable#getCurrentRowState() + Region mockRegion = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(mockRegion); + + Mockito.when(mockRegion.getScanner(Mockito.any(Scan.class))) + .thenAnswer(new Answer<RegionScanner>() { + @Override + public RegionScanner answer(InvocationOnMock invocation) throws Throwable { + Scan sArg = (Scan) invocation.getArguments()[0]; + TimeRange timeRange = sArg.getTimeRange(); + return getMockTimeRangeRegionScanner(timeRange); + } + }); + + // the following is called by PhoenixIndexCodec#getIndexUpserts() , getIndexDeletes() + HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); + Mockito.when(mockRegionInfo.getStartKey()).thenReturn(Bytes.toBytes("a")); + Mockito.when(mockRegionInfo.getEndKey()).thenReturn(Bytes.toBytes("z")); + + mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class); + Mockito.when(mockIndexMetaData.isImmutableRows()).thenReturn(false); + Mockito.when(mockIndexMetaData.getIndexMaintainers()) + .thenReturn(Collections.singletonList(getTestIndexMaintainer())); + + indexBuilder = new NonTxIndexBuilder(); + indexBuilder.setup(env); + } + + // returns a RegionScanner which filters currentRowCells using the given TimeRange. + // This is called from LocalTable#getCurrentRowState() + // If testIndexMetaData.ignoreNewerMutations() is not set, default TimeRange is 0 to + // Long.MAX_VALUE + private RegionScanner getMockTimeRangeRegionScanner(final TimeRange timeRange) { + return new BaseRegionScanner(Mockito.mock(RegionScanner.class)) { + @Override + public boolean next(List<Cell> results) throws IOException { + for (Cell cell : currentRowCells) { + if (cell.getTimestamp() >= timeRange.getMin() + && cell.getTimestamp() < timeRange.getMax()) { + results.add(cell); + } + } + return false; // indicate no more results + } + }; + } + + private IndexMaintainer getTestIndexMaintainer() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + // disable column encoding, makes debugging easier + props.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0"); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.setAutoCommit(true); + conn.createStatement().execute(TEST_TABLE_DDL); + conn.createStatement().execute(TEST_TABLE_INDEX_DDL); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), TEST_TABLE_STRING)); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + table.getIndexMaintainers(ptr, pconn); + List<IndexMaintainer> indexMaintainerList = + IndexMaintainer.deserialize(ptr, GenericKeyValueBuilder.INSTANCE, true); + assertEquals(1, indexMaintainerList.size()); + IndexMaintainer indexMaintainer = indexMaintainerList.get(0); + return indexMaintainer; + } finally { + conn.close(); + } + } + + /** + * Tests that updating an indexed column results in a DeleteFamily (prior index cell) and a Put + * (new index cell) + */ + @Test + public void testGetMutableIndexUpdate() throws IOException { + setCurrentRowState(FAM, INDEXED_QUALIFIER, 1, VALUE_1); + + // update ts and value + Put put = new Put(ROW); + put.addImmutable(FAM, INDEXED_QUALIFIER, 2, VALUE_2); + MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW)); + mutation.addAll(put); + + Collection<Pair<Mutation, byte[]>> indexUpdates = + indexBuilder.getIndexUpdate(mutation, mockIndexMetaData); + assertEquals(2, indexUpdates.size()); + assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM, + new byte[0] /* qual not needed */, 2); + assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW, + KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 2); + } + + /** + * Tests a partial rebuild of a row with multiple versions. 3 versions of the row in data table, + * and we rebuild the index starting from time t=2 + */ + @Test + public void testRebuildMultipleVersionRow() throws IOException { + // when doing a rebuild, we are replaying mutations so we want to ignore newer mutations + // see LocalTable#getCurrentRowState() + Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(true); + + // the current row state has 3 versions, but if we rebuild as of t=2, scanner in LocalTable + // should only return first + Cell currentCell1 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 1, PUT_TYPE, VALUE_1); + Cell currentCell2 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 2, PUT_TYPE, VALUE_2); + Cell currentCell3 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 3, PUT_TYPE, VALUE_3); + setCurrentRowState(Arrays.asList(currentCell3, currentCell2, currentCell1)); + + // rebuilder replays mutations starting from t=2 + MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW)); + Put put = new Put(ROW); + put.addImmutable(FAM, INDEXED_QUALIFIER, 3, VALUE_3); + mutation.addAll(put); + put = new Put(ROW); + put.addImmutable(FAM, INDEXED_QUALIFIER, 2, VALUE_2); + mutation.addAll(put); + + Collection<Pair<Mutation, byte[]>> indexUpdates = + indexBuilder.getIndexUpdate(mutation, mockIndexMetaData); + assertEquals(2, indexUpdates.size()); + assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM, + new byte[0] /* qual not needed */, 2); + assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW, + KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 3); + } + + /** + * Tests getting an index update for a mutation with 200 versions Before, the issue PHOENIX-3807 + * was causing this test to take >90 seconds, so here we set a timeout of 5 seconds + */ + @Test(timeout = 5000) + public void testManyVersions() throws IOException { + // when doing a rebuild, we are replaying mutations so we want to ignore newer mutations + // see LocalTable#getCurrentRowState() + Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(true); + MultiMutation mutation = getMultipleVersionMutation(200); + currentRowCells = mutation.getFamilyCellMap().get(FAM); + + Collection<Pair<Mutation, byte[]>> indexUpdates = + indexBuilder.getIndexUpdate(mutation, mockIndexMetaData); + assertNotEquals(0, indexUpdates.size()); + } + + // Assert that the given collection of indexUpdates contains the given cell + private void assertContains(Collection<Pair<Mutation, byte[]>> indexUpdates, + final long mutationTs, final byte[] row, final Type cellType, final byte[] fam, + final byte[] qual, final long cellTs) { + Predicate<Pair<Mutation, byte[]>> hasCellPredicate = + new Predicate<Pair<Mutation, byte[]>>() { + @Override + public boolean apply(Pair<Mutation, byte[]> input) { + assertEquals(TEST_TABLE_INDEX_STRING, Bytes.toString(input.getSecond())); + Mutation mutation = input.getFirst(); + if (mutationTs == mutation.getTimeStamp()) { + NavigableMap<byte[], List<Cell>> familyCellMap = + mutation.getFamilyCellMap(); + Cell updateCell = familyCellMap.get(fam).get(0); + if (cellType == KeyValue.Type.codeToType(updateCell.getTypeByte()) + && Bytes.compareTo(fam, CellUtil.cloneFamily(updateCell)) == 0 + && Bytes.compareTo(qual, + CellUtil.cloneQualifier(updateCell)) == 0 + && cellTs == updateCell.getTimestamp()) { + return true; + } + } + return false; + } + }; + Optional<Pair<Mutation, byte[]>> tryFind = + Iterables.tryFind(indexUpdates, hasCellPredicate); + assertTrue(tryFind.isPresent()); + } + + private void setCurrentRowState(byte[] fam2, byte[] indexedQualifier, int i, byte[] value1) { + Cell cell = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 1, PUT_TYPE, VALUE_1); + currentRowCells = Collections.singletonList(cell); + } + + private void setCurrentRowState(List<Cell> cells) { + currentRowCells = cells; + } + + private MultiMutation getMultipleVersionMutation(int versions) { + MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW)); + for (int i = versions - 1; i >= 0; i--) { + Put put = new Put(ROW); + put.addImmutable(FAM, INDEXED_QUALIFIER, i, Bytes.toBytes(i)); + mutation.addAll(put); + } + return mutation; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/85e344fd/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java new file mode 100644 index 0000000..b11ac8d --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered.data; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestLocalTable { + private static final byte[] ROW = Bytes.toBytes("test_row"); + + @Test + public void testGetOldestTimestamp() { + LocalTable localTable = new LocalTable(null); + + List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L)); + assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1))); + + List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L)); + List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2)); + assertEquals(2L, localTable.getOldestTimestamp(set1)); + + List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L)); + set1.add(cellList3); + assertEquals(1L, localTable.getOldestTimestamp(set1)); + + List<Cell> cellList4 = + getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L)); + set1.add(cellList4); + assertEquals(0L, localTable.getOldestTimestamp(set1)); + } + + private List<Cell> getCellList(KeyValue... kvs) { + List<Cell> cellList = new ArrayList<>(); + for (KeyValue kv : kvs) { + cellList.add(kv); + } + return cellList; + } +}
