PHOENIX-4214 Scans which write should not block region split or close (Vincent 
Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/519273b8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/519273b8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/519273b8

Branch: refs/heads/4.x-HBase-1.2
Commit: 519273b85dbc7f2d8d3fe71fd434bb8a396d3b04
Parents: 64ef10b
Author: James Taylor <[email protected]>
Authored: Fri Sep 29 12:26:41 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Fri Sep 29 12:36:06 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 245 +++++++++++++++----
 .../UngroupedAggregateRegionObserver.java       |  16 +-
 2 files changed, 210 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/519273b8/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 53346b9..dc9de81 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -32,35 +33,59 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class UpsertSelectOverlappingBatchesIT extends 
BaseUniqueNamesOwnClusterIT {
-    
+    private static final Logger logger = 
LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
+    private Properties props;
+    private static volatile String dataTable;
+    private String index;
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
         serverProps.put("hbase.coprocessor.region.classes", 
SlowBatchRegionObserver.class.getName());
         serverProps.put("hbase.rowlock.wait.duration", "5000");
         serverProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "100");
-        Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()));
     }
-    
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        getUtility().shutdownMiniCluster();
+    }
+
     private class UpsertSelectRunner implements Callable<Boolean> {
        private final String dataTable;
        private final int minIndex;
@@ -89,58 +114,186 @@ public class UpsertSelectOverlappingBatchesIT extends 
BaseUniqueNamesOwnClusterI
                                return true;
                        }
                }
-       
     }
-    
+
+    private static class UpsertSelectLooper implements Runnable {
+        private UpsertSelectRunner runner;
+        public UpsertSelectLooper(UpsertSelectRunner runner) {
+            this.runner = runner;
+        }
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    runner.call();
+                }
+                catch (Exception e) {
+                    if (ExceptionUtils.indexOfThrowable(e, 
InterruptedException.class) != -1) {
+                        logger.info("Interrupted, exiting", e);
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+                    logger.error("Hit exception while writing", e);
+                }
+            }
+        }};
+
+    @Before
+    public void setup() throws Exception {
+        SlowBatchRegionObserver.SLOW_MUTATE = false;
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+        dataTable = generateUniqueName();
+        index = "IDX_" + dataTable;
+        try (Connection conn = driver.connect(url, props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTable
+                    + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR)");
+            // create the index and ensure its empty as well
+            conn.createStatement().execute("CREATE INDEX " + index + " ON " + 
dataTable + " (v1)");
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
dataTable + " VALUES(?,?,?)");
+            conn.setAutoCommit(false);
+            for (int i = 0; i < 100; i++) {
+                stmt.setInt(1, i);
+                stmt.setString(2, "v1" + i);
+                stmt.setString(3, "v2" + i);
+                stmt.execute();
+            }
+            conn.commit();
+        }
+    }
+
        @Test
        public void testUpsertSelectSameBatchConcurrently() throws Exception {
-               final String dataTable = generateUniqueName();
-               final String index = "IDX_" + dataTable;
-               // create the table and ensure its empty
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               Connection conn = driver.connect(url, props);
-               conn.createStatement()
-                               .execute("CREATE TABLE " + dataTable + " (k 
INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-               // create the index and ensure its empty as well
-               conn.createStatement().execute("CREATE INDEX " + index + " ON " 
+ dataTable + " (v1)");
-
-               conn = DriverManager.getConnection(getUrl(), props);
-               PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
dataTable + " VALUES(?,?,?)");
-               conn.setAutoCommit(false);
-               for (int i = 0; i < 100; i++) {
-                       stmt.setInt(1, i);
-                       stmt.setString(2, "v1" + i);
-                       stmt.setString(3, "v2" + i);
-                       stmt.execute();
-               }
-               conn.commit();
-
-               int numUpsertSelectRunners = 5;
-               ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
-               CompletionService<Boolean> completionService = new 
ExecutorCompletionService<Boolean>(exec);
-               List<Future<Boolean>> futures = 
Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
-               // run one UPSERT SELECT for 100 rows (that locks the rows for 
a long time)
-               futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, 0, 105, 1)));
-               // run four UPSERT SELECTS for 5 rows (that overlap with slow 
running UPSERT SELECT)
-               for (int i = 0; i < 100; i += 25) {
-                       futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, i, i+25, 5)));
-               }
-               int received = 0;
-               while (received < futures.size()) {
-                       Future<Boolean> resultFuture = 
completionService.take(); 
-                       Boolean result = resultFuture.get();
-                       received++;
-                       assertTrue(result);
+               try (Connection conn = driver.connect(url, props)) {
+                       int numUpsertSelectRunners = 5;
+                       ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
+                       CompletionService<Boolean> completionService = new 
ExecutorCompletionService<Boolean>(exec);
+                       List<Future<Boolean>> futures = 
Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+                       // run one UPSERT SELECT for 100 rows (that locks the 
rows for a long time)
+                       futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, 0, 105, 1)));
+                       // run four UPSERT SELECTS for 5 rows (that overlap 
with slow running UPSERT SELECT)
+                       for (int i = 0; i < 100; i += 25) {
+                           futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, i, i+25, 5)));
+                       }
+                       int received = 0;
+                       while (received < futures.size()) {
+                           Future<Boolean> resultFuture = 
completionService.take();
+                           Boolean result = resultFuture.get();
+                           received++;
+                           assertTrue(result);
+                       }
+                       exec.shutdownNow();
                }
-               exec.shutdownNow();
-               conn.close();
        }
+
+    /**
+     * Tests that splitting a region is not blocked indefinitely by UPSERT 
SELECT load
+     */
+       @Test
+    public void testSplitDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            // keep trying to split the region
+            final HBaseTestingUtility utility = getUtility();
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final TableName dataTN = TableName.valueOf(dataTable);
+            assertEquals(1, 
utility.getHBaseCluster().getRegions(dataTN).size());
+            utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    try {
+                        List<HRegionInfo> regions = 
admin.getTableRegions(dataTN);
+                        if (regions.size() > 1) {
+                            logger.info("Found region was split");
+                            return true;
+                        }
+                        if (regions.size() == 0) {
+                            // This happens when region in transition or closed
+                            logger.info("No region returned");
+                            return false;
+                        }
+                        ;
+                        HRegionInfo hRegion = regions.get(0);
+                        logger.info("Attempting to split region");
+                        admin.splitRegion(hRegion.getRegionName(), 
Bytes.toBytes(2));
+                        return false;
+                    } catch (NotServingRegionException nsre) {
+                        // during split
+                        return false;
+                    }
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Tests that UPSERT SELECT doesn't indefinitely block region closes
+     */
+    @Test
+    public void testRegionCloseDuringUpsertSelect() throws Exception {
+        int numUpsertSelectRunners = 4;
+        ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
+        try (Connection conn = driver.connect(url, props)) {
+            final UpsertSelectRunner upsertSelectRunner =
+                    new UpsertSelectRunner(dataTable, 0, 105, 1);
+            // keep running slow upsert selects
+            SlowBatchRegionObserver.SLOW_MUTATE = true;
+            for (int i = 0; i < numUpsertSelectRunners; i++) {
+                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+                Thread.sleep(300);
+            }
+
+            final HBaseTestingUtility utility = getUtility();
+            // try to close the region while UPSERT SELECTs are happening,
+            final HRegionServer dataRs = 
utility.getHBaseCluster().getRegionServer(0);
+            final HBaseAdmin admin = utility.getHBaseAdmin();
+            final HRegionInfo dataRegion =
+                    admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
+            logger.info("Closing data table region");
+            admin.closeRegion(dataRs.getServerName(), dataRegion);
+            // make sure the region is offline
+            utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    List<HRegionInfo> onlineRegions =
+                            admin.getOnlineRegions(dataRs.getServerName());
+                    for (HRegionInfo onlineRegion : onlineRegions) {
+                        if (onlineRegion.equals(dataRegion)) {
+                            logger.info("Data region still online");
+                            return false;
+                        }
+                    }
+                    logger.info("Region is no longer online");
+                    return true;
+                }
+            });
+        } finally {
+            SlowBatchRegionObserver.SLOW_MUTATE = false;
+            exec.shutdownNow();
+            exec.awaitTermination(60, TimeUnit.SECONDS);
+        }
+    }
     
     public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean SLOW_MUTATE = false;
         @Override
         public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
                // model a slow batch that takes a long time
-            if (miniBatchOp.size()==100) {
+            if ((miniBatchOp.size()==100 || SLOW_MUTATE) && 
c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable))
 {
                try {
                                        Thread.sleep(6000);
                                } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/519273b8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 30f89cb..c3024a7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     /**
      * This lock used for synchronizing the state of
      * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
-     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used 
to avoid possible
+     * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} 
variables used to avoid possible
      * dead lock situation in case below steps: 
      * 1. We get read lock when we start writing local indexes, deletes etc.. 
      * 2. when memstore reach threshold, flushes happen. Since they use read 
(shared) lock they 
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     @GuardedBy("lock")
     private int scansReferenceCount = 0;
     @GuardedBy("lock")
-    private boolean isRegionClosing = false;
+    private boolean isRegionClosingOrSplitting = false;
     private static final Logger logger = 
LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
      */
     private void checkForRegionClosing() throws IOException {
         synchronized (lock) {
-            if(isRegionClosing) {
+            if(isRegionClosingOrSplitting) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing 
to write to avoid possible deadlock.");
             }
@@ -499,10 +499,15 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             useIndexProto = false;
         }
         boolean acquiredLock = false;
+        boolean incrScanRefCount = false;
         try {
             if(needToWrite) {
                 synchronized (lock) {
+                    if (isRegionClosingOrSplitting) {
+                        throw new IOException("Temporarily unable to write 
from scan because region is closing or splitting");
+                    }
                     scansReferenceCount++;
+                    incrScanRefCount = true;
                     lock.notifyAll();
                 }
             }
@@ -755,7 +760,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 }
             }
         } finally {
-            if (needToWrite) {
+            if (needToWrite && incrScanRefCount) {
                 synchronized (lock) {
                     scansReferenceCount--;
                     if (scansReferenceCount < 0) {
@@ -1295,6 +1300,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         // Don't allow splitting if operations need read and write to same 
region are going on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
+            isRegionClosingOrSplitting = true;
             if (scansReferenceCount > 0) {
                 throw new IOException("Operations like local index 
building/delete/upsert select"
                         + " might be going on so not allowing to split.");
@@ -1319,7 +1325,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, 
boolean abortRequested)
             throws IOException {
         synchronized (lock) {
-            isRegionClosing = true;
+            isRegionClosingOrSplitting = true;
             while (scansReferenceCount > 0) {
                 try {
                     lock.wait(1000);

Reply via email to