Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 72dbbb38d -> 519273b85


Revert "PHOENIX-4214 Scans which write should not block region split or close 
(Vincent Poon)"

This reverts commit 6f65a7935b640969e570b870de9fa59e2a5bca67.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/64ef10b7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/64ef10b7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/64ef10b7

Branch: refs/heads/4.x-HBase-1.2
Commit: 64ef10b7d823db8a4f625a26304dded8e2f7df76
Parents: 72dbbb3
Author: James Taylor <[email protected]>
Authored: Fri Sep 29 12:35:38 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Fri Sep 29 12:35:38 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 239 ++++---------------
 .../UngroupedAggregateRegionObserver.java       |  23 +-
 2 files changed, 53 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/64ef10b7/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index fbf3231..53346b9 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.execute;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -33,43 +32,25 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class UpsertSelectOverlappingBatchesIT extends 
BaseUniqueNamesOwnClusterIT {
-    private static final Logger logger = 
LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
-    private Properties props;
-    private static volatile String dataTable;
-    private String index;
-
+    
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
@@ -79,12 +60,7 @@ public class UpsertSelectOverlappingBatchesIT extends 
BaseUniqueNamesOwnClusterI
         Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
-
-    @AfterClass
-    public static void tearDownClass() throws Exception {
-        getUtility().shutdownMiniCluster();
-    }
-
+    
     private class UpsertSelectRunner implements Callable<Boolean> {
        private final String dataTable;
        private final int minIndex;
@@ -113,185 +89,58 @@ public class UpsertSelectOverlappingBatchesIT extends 
BaseUniqueNamesOwnClusterI
                                return true;
                        }
                }
+       
     }
-
-    private static class UpsertSelectLooper implements Runnable {
-        private UpsertSelectRunner runner;
-        public UpsertSelectLooper(UpsertSelectRunner runner) {
-            this.runner = runner;
-        }
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    runner.call();
-                }
-                catch (Exception e) {
-                    if (ExceptionUtils.indexOfThrowable(e, 
InterruptedException.class) != -1) {
-                        logger.info("Interrupted, exiting", e);
-                        Thread.currentThread().interrupt();
-                        return;
-                    }
-                    logger.error("Hit exception while writing", e);
-                }
-            }
-        }};
-
-    @Before
-    public void setup() throws Exception {
-        SlowBatchRegionObserver.SLOW_MUTATE = false;
-        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        dataTable = generateUniqueName();
-        index = "IDX_" + dataTable;
-        try (Connection conn = driver.connect(url, props)) {
-            conn.createStatement().execute("CREATE TABLE " + dataTable
-                    + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR)");
-            // create the index and ensure its empty as well
-            conn.createStatement().execute("CREATE INDEX " + index + " ON " + 
dataTable + " (v1)");
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
dataTable + " VALUES(?,?,?)");
-            conn.setAutoCommit(false);
-            for (int i = 0; i < 100; i++) {
-                stmt.setInt(1, i);
-                stmt.setString(2, "v1" + i);
-                stmt.setString(3, "v2" + i);
-                stmt.execute();
-            }
-            conn.commit();
-        }
-    }
-
+    
        @Test
        public void testUpsertSelectSameBatchConcurrently() throws Exception {
-               try (Connection conn = driver.connect(url, props)) {
-                       int numUpsertSelectRunners = 5;
-                       ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
-                       CompletionService<Boolean> completionService = new 
ExecutorCompletionService<Boolean>(exec);
-                       List<Future<Boolean>> futures = 
Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
-                       // run one UPSERT SELECT for 100 rows (that locks the 
rows for a long time)
-                       futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, 0, 105, 1)));
-                       // run four UPSERT SELECTS for 5 rows (that overlap 
with slow running UPSERT SELECT)
-                       for (int i = 0; i < 100; i += 25) {
-                           futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, i, i+25, 5)));
-                       }
-                       int received = 0;
-                       while (received < futures.size()) {
-                           Future<Boolean> resultFuture = 
completionService.take(); 
-                           Boolean result = resultFuture.get();
-                           received++;
-                           assertTrue(result);
-                       }
-                       exec.shutdownNow();
+               final String dataTable = generateUniqueName();
+               final String index = "IDX_" + dataTable;
+               // create the table and ensure its empty
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               Connection conn = driver.connect(url, props);
+               conn.createStatement()
+                               .execute("CREATE TABLE " + dataTable + " (k 
INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+               // create the index and ensure its empty as well
+               conn.createStatement().execute("CREATE INDEX " + index + " ON " 
+ dataTable + " (v1)");
+
+               conn = DriverManager.getConnection(getUrl(), props);
+               PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
dataTable + " VALUES(?,?,?)");
+               conn.setAutoCommit(false);
+               for (int i = 0; i < 100; i++) {
+                       stmt.setInt(1, i);
+                       stmt.setString(2, "v1" + i);
+                       stmt.setString(3, "v2" + i);
+                       stmt.execute();
+               }
+               conn.commit();
+
+               int numUpsertSelectRunners = 5;
+               ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
+               CompletionService<Boolean> completionService = new 
ExecutorCompletionService<Boolean>(exec);
+               List<Future<Boolean>> futures = 
Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+               // run one UPSERT SELECT for 100 rows (that locks the rows for 
a long time)
+               futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, 0, 105, 1)));
+               // run four UPSERT SELECTS for 5 rows (that overlap with slow 
running UPSERT SELECT)
+               for (int i = 0; i < 100; i += 25) {
+                       futures.add(completionService.submit(new 
UpsertSelectRunner(dataTable, i, i+25, 5)));
+               }
+               int received = 0;
+               while (received < futures.size()) {
+                       Future<Boolean> resultFuture = 
completionService.take(); 
+                       Boolean result = resultFuture.get();
+                       received++;
+                       assertTrue(result);
                }
+               exec.shutdownNow();
+               conn.close();
        }
-
-    /**
-     * Tests that splitting a region is not blocked indefinitely by UPSERT 
SELECT load
-     */
-       @Test
-    public void testSplitDuringUpsertSelect() throws Exception {
-        int numUpsertSelectRunners = 4;
-        ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
-        try (Connection conn = driver.connect(url, props)) {
-            final UpsertSelectRunner upsertSelectRunner =
-                    new UpsertSelectRunner(dataTable, 0, 105, 1);
-            // keep running slow upsert selects
-            SlowBatchRegionObserver.SLOW_MUTATE = true;
-            for (int i = 0; i < numUpsertSelectRunners; i++) {
-                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
-                Thread.sleep(300);
-            }
-
-            // keep trying to split the region
-            final HBaseTestingUtility utility = getUtility();
-            final HBaseAdmin admin = utility.getHBaseAdmin();
-            final TableName dataTN = TableName.valueOf(dataTable);
-            assertEquals(1, 
utility.getHBaseCluster().getRegions(dataTN).size());
-            utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    try {
-                        List<HRegionInfo> regions = 
admin.getTableRegions(dataTN);
-                        if (regions.size() > 1) {
-                            logger.info("Found region was split");
-                            return true;
-                        }
-                        if (regions.size() == 0) {
-                            // This happens when region in transition or closed
-                            logger.info("No region returned");
-                            return false;
-                        }
-                        ;
-                        HRegionInfo hRegion = regions.get(0);
-                        logger.info("Attempting to split region");
-                        admin.splitRegion(hRegion.getRegionName(), 
Bytes.toBytes(2));
-                        return false;
-                    } catch (NotServingRegionException nsre) {
-                        // during split
-                        return false;
-                    }
-                }
-            });
-        } finally {
-            SlowBatchRegionObserver.SLOW_MUTATE = false;
-            exec.shutdownNow();
-            exec.awaitTermination(60, TimeUnit.SECONDS);
-        }
-    }
-
-    /**
-     * Tests that UPSERT SELECT doesn't indefinitely block region closes
-     */
-    @Test
-    public void testRegionCloseDuringUpsertSelect() throws Exception {
-        int numUpsertSelectRunners = 4;
-        ExecutorService exec = 
Executors.newFixedThreadPool(numUpsertSelectRunners);
-        try (Connection conn = driver.connect(url, props)) {
-            final UpsertSelectRunner upsertSelectRunner =
-                    new UpsertSelectRunner(dataTable, 0, 105, 1);
-            // keep running slow upsert selects
-            SlowBatchRegionObserver.SLOW_MUTATE = true;
-            for (int i = 0; i < numUpsertSelectRunners; i++) {
-                exec.submit(new UpsertSelectLooper(upsertSelectRunner));
-                Thread.sleep(300);
-            }
-
-            final HBaseTestingUtility utility = getUtility();
-            // try to close the region while UPSERT SELECTs are happening,
-            final HRegionServer dataRs = 
utility.getHBaseCluster().getRegionServer(0);
-            final HBaseAdmin admin = utility.getHBaseAdmin();
-            final HRegionInfo dataRegion =
-                    admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
-            logger.info("Closing data table region");
-            admin.closeRegion(dataRs.getServerName(), dataRegion);
-            // make sure the region is offline
-            utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    List<HRegionInfo> onlineRegions =
-                            admin.getOnlineRegions(dataRs.getServerName());
-                    for (HRegionInfo onlineRegion : onlineRegions) {
-                        if (onlineRegion.equals(dataRegion)) {
-                            logger.info("Data region still online");
-                            return false;
-                        }
-                    }
-                    logger.info("Region is no longer online");
-                    return true;
-                }
-            });
-        } finally {
-            SlowBatchRegionObserver.SLOW_MUTATE = false;
-            exec.shutdownNow();
-            exec.awaitTermination(60, TimeUnit.SECONDS);
-        }
-    }
     
     public static class SlowBatchRegionObserver extends SimpleRegionObserver {
-        public static volatile boolean SLOW_MUTATE = false;
         @Override
         public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
                // model a slow batch that takes a long time
-            if ((miniBatchOp.size()==100 || SLOW_MUTATE) && 
c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable))
 {
+            if (miniBatchOp.size()==100) {
                try {
                                        Thread.sleep(6000);
                                } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/64ef10b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 582e606..30f89cb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     /**
      * This lock used for synchronizing the state of
      * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
-     * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} 
variables used to avoid possible
+     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used 
to avoid possible
      * dead lock situation in case below steps: 
      * 1. We get read lock when we start writing local indexes, deletes etc.. 
      * 2. when memstore reach threshold, flushes happen. Since they use read 
(shared) lock they 
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     @GuardedBy("lock")
     private int scansReferenceCount = 0;
     @GuardedBy("lock")
-    private boolean isRegionClosingOrSplitting = false;
+    private boolean isRegionClosing = false;
     private static final Logger logger = 
LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
      */
     private void checkForRegionClosing() throws IOException {
         synchronized (lock) {
-            if(isRegionClosingOrSplitting) {
+            if(isRegionClosing) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing 
to write to avoid possible deadlock.");
             }
@@ -499,16 +499,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             useIndexProto = false;
         }
         boolean acquiredLock = false;
-        if(needToWrite) {
-            synchronized (lock) {
-                if (isRegionClosingOrSplitting) {
-                    throw new IOException("Temporarily unable to write from 
scan because region is closing or splitting");
+        try {
+            if(needToWrite) {
+                synchronized (lock) {
+                    scansReferenceCount++;
+                    lock.notifyAll();
                 }
-                scansReferenceCount++;
-                lock.notifyAll();
             }
-        }
-        try {
             region.startRegionOperation();
             acquiredLock = true;
             synchronized (innerScanner) {
@@ -1298,7 +1295,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         // Don't allow splitting if operations need read and write to same 
region are going on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
-            isRegionClosingOrSplitting = true;
             if (scansReferenceCount > 0) {
                 throw new IOException("Operations like local index 
building/delete/upsert select"
                         + " might be going on so not allowing to split.");
@@ -1323,13 +1319,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, 
boolean abortRequested)
             throws IOException {
         synchronized (lock) {
-            isRegionClosingOrSplitting = true;
+            isRegionClosing = true;
             while (scansReferenceCount > 0) {
                 try {
                     lock.wait(1000);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
-                    throw new IOException("Interrupted while waiting for 
completion of operations like local index building/delete/upsert select");
                 }
             }
         }

Reply via email to