Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 6bcee5776 -> 31fca07f1


PHOENIX-3796 LocalIndexes apply the entire batch for each mutation in a batch 
(Lars Hofhansl)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dbd11ab8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dbd11ab8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dbd11ab8

Branch: refs/heads/4.x-HBase-0.98
Commit: dbd11ab83b2d7e45cd4522044e74daca6dae4e71
Parents: d174440
Author: James Taylor <[email protected]>
Authored: Wed Apr 19 11:28:31 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Wed Apr 19 11:28:31 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/LocalIndexIT.java     | 30 ++++++++
 .../org/apache/phoenix/hbase/index/Indexer.java | 73 +++++++-------------
 2 files changed, 56 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd11ab8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index a7d0028..8d3316b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -617,6 +617,36 @@ public class LocalIndexIT extends BaseLocalIndexIT {
         }
     }
 
+    @Test
+    public void testLocalGlobalIndexMix() throws Exception {
+        if (isNamespaceMapped) { return; }
+        String tableName = generateUniqueName();
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT 
NULL,\n" +
+                "k1 INTEGER NOT NULL,\n" +
+                "k2 INTEGER NOT NULL,\n" +
+                "k3 INTEGER,\n" +
+                "v1 VARCHAR,\n" +
+                "v2 VARCHAR,\n" +
+                "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+        conn1.createStatement().execute(ddl);
+        conn1.createStatement().execute("CREATE LOCAL INDEX LV1 ON " + 
tableName + "(v1)");
+        conn1.createStatement().execute("CREATE INDEX GV2 ON " + tableName + 
"(v2)");
+
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " 
values('b',1,2,4,'z','3')");
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " 
values('f',1,2,3,'a','0')");
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " 
values('j',2,4,2,'a','2')");
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " 
values('q',3,1,1,'c','1')");
+        conn1.commit();
+        ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName + " WHERE v1 = 'c'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + 
tableName + " WHERE v2 = '2'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        conn1.close();
+    }
+
     private void copyLocalIndexHFiles(Configuration conf, HRegionInfo 
fromRegion, HRegionInfo toRegion, boolean move)
             throws IOException {
         Path root = FSUtils.getRootDir(conf);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd11ab8/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 36b0ffe..8c5c733 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -372,7 +372,7 @@ public class Indexer extends BaseRegionObserver {
       super.postPut(e, put, edit, durability);
           return;
         }
-    doPost(edit, put, durability, true, false);
+    doPost(edit, put, durability);
   }
 
   @Override
@@ -382,29 +382,10 @@ public class Indexer extends BaseRegionObserver {
       super.postDelete(e, delete, edit, durability);
           return;
         }
-    doPost(edit, delete, durability, true, false);
+    doPost(edit, delete, durability);
   }
 
   @Override
-  public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-      if (this.disabled) {
-        super.postBatchMutate(c, miniBatchOp);
-        return;
-      }
-      WALEdit edit = miniBatchOp.getWalEdit(0);
-      if (edit != null) {
-        IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
-        if (ikv != null) {
-          // This will prevent the postPut and postDelete hooks from doing 
anything
-          // We need to do this now, as the postBatchMutateIndispensably 
(where the
-          // actual index writing gets done) is called after the postPut and 
postDelete.
-          ikv.markBatchFinished();
-        }
-      }
-  }
-  
-  @Override
   public void 
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean 
success) throws IOException {
       if (this.disabled) {
@@ -417,17 +398,13 @@ public class Indexer extends BaseRegionObserver {
         //each batch operation, only the first one will have anything useful, 
so we can just grab that
         Mutation mutation = miniBatchOp.getOperation(0);
         WALEdit edit = miniBatchOp.getWalEdit(0);
-        // We're forcing the index writes here because we've marked the index 
batch as "finished"
-        // to prevent postPut and postDelete from doing anything, but hold off 
on writing them
-        // until now so we're outside of the MVCC lock (see PHOENIX-3789). 
Without this hacky
-        // forceWrite flag, we'd ignore them again here too.
-        doPost(edit, mutation, mutation.getDurability(), false, true);
+        doPost(edit, mutation, mutation.getDurability());
     }
   }
 
-  private void doPost(WALEdit edit, Mutation m, final Durability durability, 
boolean allowLocalUpdates, boolean forceWrite) throws IOException {
+  private void doPost(WALEdit edit, Mutation m, final Durability durability) 
throws IOException {
     try {
-      doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite);
+      doPostWithExceptions(edit, m, durability);
       return;
     } catch (Throwable e) {
       rethrowIndexingException(e);
@@ -436,7 +413,7 @@ public class Indexer extends BaseRegionObserver {
         "Somehow didn't complete the index update, but didn't return 
succesfully either!");
   }
 
-  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability 
durability, boolean allowLocalUpdates, boolean forceWrite)
+  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability 
durability)
           throws Exception {
       //short circuit, if we don't need to do any work
       if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || 
edit == null) {
@@ -470,30 +447,32 @@ public class Indexer extends BaseRegionObserver {
            * once (this hook gets called with the same WALEdit for each 
Put/Delete in a batch, which can
            * lead to writing all the index updates for each Put/Delete).
            */
-          if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) {
+          if (!ikv.getBatchFinished()) {
               Collection<Pair<Mutation, byte[]>> indexUpdates = 
extractIndexUpdate(edit);
 
               // the WAL edit is kept in memory and we already specified the 
factory when we created the
               // references originally - therefore, we just pass in a null 
factory here and use the ones
               // already specified on each reference
               try {
-                 if (!ikv.getBatchFinished() || forceWrite) {
-                         current.addTimelineAnnotation("Actually doing index 
update for first time");
-                         writer.writeAndKillYourselfOnFailure(indexUpdates, 
allowLocalUpdates);
-                 } else if (allowLocalUpdates) {
-                         Collection<Pair<Mutation, byte[]>> localUpdates =
-                                         new ArrayList<Pair<Mutation, 
byte[]>>();
-                         current.addTimelineAnnotation("Actually doing local 
index update for first time");
-                         for (Pair<Mutation, byte[]> mutation : indexUpdates) {
-                                 if 
(Bytes.toString(mutation.getSecond()).equals(
-                                                 
environment.getRegion().getTableDesc().getNameAsString())) {
-                                         localUpdates.add(mutation);
-                                 }
-                         }
-                      if(!localUpdates.isEmpty()) {
-                         writer.writeAndKillYourselfOnFailure(localUpdates, 
allowLocalUpdates);
-                      }
-                 }
+                         current.addTimelineAnnotation("Actually doing index 
update for first time");
+                  Collection<Pair<Mutation, byte[]>> localUpdates =
+                          new ArrayList<Pair<Mutation, byte[]>>();
+                  Collection<Pair<Mutation, byte[]>> remoteUpdates =
+                          new ArrayList<Pair<Mutation, byte[]>>();
+                         for (Pair<Mutation, byte[]> mutation : indexUpdates) {
+                                 if 
(Bytes.toString(mutation.getSecond()).equals(
+                                                 
environment.getRegion().getTableDesc().getNameAsString())) {
+                                         localUpdates.add(mutation);
+                                 } else {
+                          remoteUpdates.add(mutation);
+                                 }
+                         }
+                  if(!remoteUpdates.isEmpty()) {
+                      writer.writeAndKillYourselfOnFailure(remoteUpdates, 
false);
+                  }
+                  if(!localUpdates.isEmpty()) {
+                      writer.writeAndKillYourselfOnFailure(localUpdates, true);
+                  }
               } finally {                  // With a custom kill policy, we 
may throw instead of kill the server.
                   // Without doing this in a finally block (at least with the 
mini cluster),
                   // the region server never goes down.

Reply via email to