Repository: phoenix
Updated Branches:
  refs/heads/master a0f47c2be -> 813c1c8aa


PHOENIX-4109 Ensure mutations are processed in batches with same time stamp 
during partial rebuild


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/813c1c8a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/813c1c8a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/813c1c8a

Branch: refs/heads/master
Commit: 813c1c8aa0f44f0b38ef5bb4c9ed418f693b5c0d
Parents: a0f47c2
Author: James Taylor <[email protected]>
Authored: Sun Aug 20 16:57:44 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Sun Aug 20 16:57:44 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/PartialIndexRebuilderIT.java  | 174 ++++++++++++++++---
 .../org/apache/phoenix/hbase/index/Indexer.java |   2 +-
 .../hbase/index/covered/NonTxIndexBuilder.java  |   4 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |   2 +-
 4 files changed, 156 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 0dc9a35..f42d2c9 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -18,9 +18,9 @@
 package org.apache.phoenix.end2end.index;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -29,8 +29,8 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -38,13 +38,10 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PTable;
@@ -54,6 +51,7 @@ import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.Repeat;
 import org.apache.phoenix.util.RunUntilFailure;
@@ -131,7 +129,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
     @Test
-    @Repeat(3)
+    @Repeat(5)
     public void testConcurrentUpsertsWithRebuild() throws Throwable {
         int nThreads = 5;
         final int batchSize = 200;
@@ -145,7 +143,6 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         Connection conn = DriverManager.getConnection(getUrl());
         HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
         conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k1 
INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY 
(k1,k2)) STORE_NULLS=true, VERSIONS=1");
-        //addDelayingCoprocessor(conn, tableName);
         conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
fullTableName + "(v1)");
         
         final CountDownLatch doneSignal1 = new CountDownLatch(nThreads);
@@ -177,6 +174,16 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         return false;
     }
     
+    private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey 
key) throws TableNotFoundException {
+        PTable table = metaCache.getTableRef(key).getTable();
+        for (PTable index : table.getIndexes()) {
+            if (index.getIndexState() == PIndexState.DISABLE) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static boolean mutateRandomly(Connection conn, String 
fullTableName, int nRows, boolean checkForInactive, String fullIndexName) 
throws SQLException, InterruptedException {
         PTableKey key = new PTableKey(null,fullTableName);
         PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
@@ -238,7 +245,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    @Repeat(3)
+    @Repeat(5)
     public void testDeleteAndUpsertAfterFailure() throws Throwable {
         final int nRows = 10;
         String schemaName = generateUniqueName();
@@ -261,19 +268,6 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
        }
     }
     
-    private static void addDelayingCoprocessor(Connection conn, String 
tableName) throws SQLException, IOException {
-        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
-        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HTableDescriptor descriptor = 
services.getTableDescriptor(Bytes.toBytes(tableName));
-        descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), 
null, priority, null);
-        HBaseAdmin admin = services.getAdmin();
-        try {
-            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
-        } finally {
-            admin.close();
-        }
-    }
-    
     @Test
     public void testWriteWhileRebuilding() throws Throwable {
         final int nRows = 10;
@@ -498,6 +492,110 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
+    public void testSeparateTimeBatchesRequired() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, 
indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1, v2)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a','0')");
+            conn.commit();
+            clock.time += 100;
+            HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, 
PIndexState.DISABLE);
+            clock.time += 100;
+            long disableTime = clock.currentTime();
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb', '11')");
+            conn.commit();
+            clock.time += 100;
+            assertTrue(hasDisabledIndex(metaCache, key));
+            assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','0')");
+            conn.commit();
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a')");
+            conn.commit();
+            clock.time += 100;
+            IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, 
PIndexState.DISABLE);
+            clock.time += 100;
+            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            clock.time += 100;
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+    
+    @Test
+    public void testMultiValuesWhenDisableAndInactive() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, 
indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) 
COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1, v2) INCLUDE (v3)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a','0','x')");
+            conn.commit();
+            clock.time += 100;
+            try (HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES))
 {
+                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the 
partial index rebuilder from triggering
+                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, 
PIndexState.DISABLE);
+                clock.time += 100;
+                long disableTime = clock.currentTime();
+                // Set some values while index disabled
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('b','bb', '11','yy')");
+                conn.commit();
+                clock.time += 100;
+                assertTrue(hasDisabledIndex(metaCache, key));
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','ccc','222','zzz')");
+                conn.commit();
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','dddd','3333','zzzz')");
+                conn.commit();
+                clock.time += 100;
+                // Will cause partial index rebuilder to be triggered
+                IndexUtil.updateIndexState(fullIndexName, disableTime, 
metaTable, PIndexState.DISABLE);
+            }
+            final CountDownLatch doneSignal = new CountDownLatch(1);
+            advanceClockUntilPartialRebuildStarts(fullIndexName, clock, 
doneSignal);
+            doneSignal.await(30, TimeUnit.SECONDS);
+            // Set some values while index is in INACTIVE state
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','eeeee','44444','zzzzz')");
+            conn.commit();
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','fffff','55555','zzzzzz')");
+            conn.commit();
+            clock.time += WAIT_AFTER_DISABLED;
+            // Enough time has passed, so rebuild will start now
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            clock.time += 100;
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+
+    @Test
     public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
@@ -563,7 +661,37 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private static void advanceClockUntilPartialRebuildStarts(final String 
fullIndexName, final MyClock clock) {
+    @Test
+    public void testRegionsOnlineCheck() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a')");
+            conn.commit();
+            Configuration conf = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+            PTable table = metaCache.getTableRef(key).getTable();
+            assertTrue(MetaDataUtil.tableRegionsOnline(conf, table));
+            try (HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+                admin.disableTable(fullTableName);
+                assertFalse(MetaDataUtil.tableRegionsOnline(conf, table));
+                admin.enableTable(fullTableName);
+            }
+            assertTrue(MetaDataUtil.tableRegionsOnline(conf, table));
+        }
+    }
+    
+    private static void advanceClockUntilPartialRebuildStarts(final String 
fullIndexName, final MyClock clock) throws InterruptedException {
+        final CountDownLatch doneSignal = new CountDownLatch(1);
+        advanceClockUntilPartialRebuildStarts(fullIndexName, clock, 
doneSignal);
+        clock.time += WAIT_AFTER_DISABLED + 1000;
+        doneSignal.await(30, TimeUnit.SECONDS);
+    }
+    
+    private static void advanceClockUntilPartialRebuildStarts(final String 
fullIndexName, final MyClock clock, final CountDownLatch doneSignal) {
         Runnable r = new Runnable() {
             @Override
             public void run() {
@@ -573,7 +701,7 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
                         Thread.sleep(1000);
                         clock.time += 1000;
                     }
-                    clock.time += WAIT_AFTER_DISABLED + 1000;
+                    doneSignal.countDown();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index e19e619..05e2ae7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -521,7 +521,7 @@ public class Indexer extends BaseRegionObserver {
           miniBatchOp.setWalEdit(0, edit);
       }
   
-      if (copyMutations) {
+      if (copyMutations || replayWrite != null) {
           mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
       }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index c013b5b..50e2c3f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -93,7 +93,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             List<KeyValue> kvs = KeyValueUtil.ensureKeyValues(family);
             for (KeyValue kv : kvs) {
                 batch.add(kv);
-                assert(ts == kv.getTimestamp());
+                if(ts != kv.getTimestamp()) {
+                    throw new IllegalStateException("Time stamps must match 
for all cells in a batch");
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/813c1c8a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index b52cb79..502ef37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -533,7 +533,7 @@ public class MetaDataUtil {
         try {
             hcon = HConnectionManager.getConnection(conf);
             List<HRegionLocation> locations = hcon.locateRegions(
-                
org.apache.hadoop.hbase.TableName.valueOf(table.getTableName().getBytes()));
+                
org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes()));
 
             for (HRegionLocation loc : locations) {
                 try {

Reply via email to