Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 6bcee5776 -> 31fca07f1
PHOENIX-3796 LocalIndexes apply the entire batch for each mutation in a batch (Lars Hofhansl) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dbd11ab8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dbd11ab8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dbd11ab8 Branch: refs/heads/4.x-HBase-0.98 Commit: dbd11ab83b2d7e45cd4522044e74daca6dae4e71 Parents: d174440 Author: James Taylor <[email protected]> Authored: Wed Apr 19 11:28:31 2017 -0700 Committer: James Taylor <[email protected]> Committed: Wed Apr 19 11:28:31 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/LocalIndexIT.java | 30 ++++++++ .../org/apache/phoenix/hbase/index/Indexer.java | 73 +++++++------------- 2 files changed, 56 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd11ab8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index a7d0028..8d3316b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -617,6 +617,36 @@ public class LocalIndexIT extends BaseLocalIndexIT { } } + @Test + public void testLocalGlobalIndexMix() throws Exception { + if (isNamespaceMapped) { return; } + String tableName = generateUniqueName(); + Connection conn1 = DriverManager.getConnection(getUrl()); + String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + + "k3 INTEGER,\n" + + "v1 VARCHAR,\n" + + "v2 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"; + conn1.createStatement().execute(ddl); + conn1.createStatement().execute("CREATE LOCAL INDEX LV1 ON " + tableName + "(v1)"); + conn1.createStatement().execute("CREATE INDEX GV2 ON " + tableName + "(v2)"); + + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z','3')"); + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a','0')"); + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a','2')"); + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c','1')"); + conn1.commit(); + ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v1 = 'c'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v2 = '2'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + conn1.close(); + } + private void copyLocalIndexHFiles(Configuration conf, HRegionInfo fromRegion, HRegionInfo toRegion, boolean move) throws IOException { Path root = FSUtils.getRootDir(conf); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd11ab8/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 36b0ffe..8c5c733 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -372,7 +372,7 @@ public class Indexer extends BaseRegionObserver { super.postPut(e, put, edit, durability); return; } - doPost(edit, put, durability, true, false); + doPost(edit, put, durability); } @Override @@ -382,29 +382,10 @@ public class Indexer extends BaseRegionObserver { super.postDelete(e, delete, edit, durability); return; } - doPost(edit, delete, durability, true, false); + doPost(edit, delete, durability); } @Override - public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - if (this.disabled) { - super.postBatchMutate(c, miniBatchOp); - return; - } - WALEdit edit = miniBatchOp.getWalEdit(0); - if (edit != null) { - IndexedKeyValue ikv = getFirstIndexedKeyValue(edit); - if (ikv != null) { - // This will prevent the postPut and postDelete hooks from doing anything - // We need to do this now, as the postBatchMutateIndispensably (where the - // actual index writing gets done) is called after the postPut and postDelete. - ikv.markBatchFinished(); - } - } - } - - @Override public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { if (this.disabled) { @@ -417,17 +398,13 @@ public class Indexer extends BaseRegionObserver { //each batch operation, only the first one will have anything useful, so we can just grab that Mutation mutation = miniBatchOp.getOperation(0); WALEdit edit = miniBatchOp.getWalEdit(0); - // We're forcing the index writes here because we've marked the index batch as "finished" - // to prevent postPut and postDelete from doing anything, but hold off on writing them - // until now so we're outside of the MVCC lock (see PHOENIX-3789). Without this hacky - // forceWrite flag, we'd ignore them again here too. - doPost(edit, mutation, mutation.getDurability(), false, true); + doPost(edit, mutation, mutation.getDurability()); } } - private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) throws IOException { + private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException { try { - doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite); + doPostWithExceptions(edit, m, durability); return; } catch (Throwable e) { rethrowIndexingException(e); @@ -436,7 +413,7 @@ public class Indexer extends BaseRegionObserver { "Somehow didn't complete the index update, but didn't return succesfully either!"); } - private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) + private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability) throws Exception { //short circuit, if we don't need to do any work if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) { @@ -470,30 +447,32 @@ public class Indexer extends BaseRegionObserver { * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can * lead to writing all the index updates for each Put/Delete). */ - if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) { + if (!ikv.getBatchFinished()) { Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit); // the WAL edit is kept in memory and we already specified the factory when we created the // references originally - therefore, we just pass in a null factory here and use the ones // already specified on each reference try { - if (!ikv.getBatchFinished() || forceWrite) { - current.addTimelineAnnotation("Actually doing index update for first time"); - writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates); - } else if (allowLocalUpdates) { - Collection<Pair<Mutation, byte[]>> localUpdates = - new ArrayList<Pair<Mutation, byte[]>>(); - current.addTimelineAnnotation("Actually doing local index update for first time"); - for (Pair<Mutation, byte[]> mutation : indexUpdates) { - if (Bytes.toString(mutation.getSecond()).equals( - environment.getRegion().getTableDesc().getNameAsString())) { - localUpdates.add(mutation); - } - } - if(!localUpdates.isEmpty()) { - writer.writeAndKillYourselfOnFailure(localUpdates, allowLocalUpdates); - } - } + current.addTimelineAnnotation("Actually doing index update for first time"); + Collection<Pair<Mutation, byte[]>> localUpdates = + new ArrayList<Pair<Mutation, byte[]>>(); + Collection<Pair<Mutation, byte[]>> remoteUpdates = + new ArrayList<Pair<Mutation, byte[]>>(); + for (Pair<Mutation, byte[]> mutation : indexUpdates) { + if (Bytes.toString(mutation.getSecond()).equals( + environment.getRegion().getTableDesc().getNameAsString())) { + localUpdates.add(mutation); + } else { + remoteUpdates.add(mutation); + } + } + if(!remoteUpdates.isEmpty()) { + writer.writeAndKillYourselfOnFailure(remoteUpdates, false); + } + if(!localUpdates.isEmpty()) { + writer.writeAndKillYourselfOnFailure(localUpdates, true); + } } finally { // With a custom kill policy, we may throw instead of kill the server. // Without doing this in a finally block (at least with the mini cluster), // the region server never goes down.
