PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/519273b8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/519273b8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/519273b8 Branch: refs/heads/4.x-HBase-1.2 Commit: 519273b85dbc7f2d8d3fe71fd434bb8a396d3b04 Parents: 64ef10b Author: James Taylor <[email protected]> Authored: Fri Sep 29 12:26:41 2017 -0700 Committer: James Taylor <[email protected]> Committed: Fri Sep 29 12:36:06 2017 -0700 ---------------------------------------------------------------------- .../UpsertSelectOverlappingBatchesIT.java | 245 +++++++++++++++---- .../UngroupedAggregateRegionObserver.java | 16 +- 2 files changed, 210 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/519273b8/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java index 53346b9..dc9de81 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.execute; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.sql.Connection; @@ -32,35 +33,59 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT { - + private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class); + private Properties props; + private static volatile String dataTable; + private String index; + @BeforeClass public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3); serverProps.put("hbase.coprocessor.region.classes", SlowBatchRegionObserver.class.getName()); serverProps.put("hbase.rowlock.wait.duration", "5000"); serverProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "100"); - Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator())); } - + + @AfterClass + public static void tearDownClass() throws Exception { + SlowBatchRegionObserver.SLOW_MUTATE = false; + getUtility().shutdownMiniCluster(); + } + private class UpsertSelectRunner implements Callable<Boolean> { private final String dataTable; private final int minIndex; @@ -89,58 +114,186 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI return true; } } - } - + + private static class UpsertSelectLooper implements Runnable { + private UpsertSelectRunner runner; + public UpsertSelectLooper(UpsertSelectRunner runner) { + this.runner = runner; + } + @Override + public void run() { + while (true) { + try { + runner.call(); + } + catch (Exception e) { + if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) { + logger.info("Interrupted, exiting", e); + Thread.currentThread().interrupt(); + return; + } + logger.error("Hit exception while writing", e); + } + } + }}; + + @Before + public void setup() throws Exception { + SlowBatchRegionObserver.SLOW_MUTATE = false; + props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + dataTable = generateUniqueName(); + index = "IDX_" + dataTable; + try (Connection conn = driver.connect(url, props)) { + conn.createStatement().execute("CREATE TABLE " + dataTable + + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + // create the index and ensure its empty as well + conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)"); + conn.setAutoCommit(false); + for (int i = 0; i < 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, "v1" + i); + stmt.setString(3, "v2" + i); + stmt.execute(); + } + conn.commit(); + } + } + @Test public void testUpsertSelectSameBatchConcurrently() throws Exception { - final String dataTable = generateUniqueName(); - final String index = "IDX_" + dataTable; - // create the table and ensure its empty - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = driver.connect(url, props); - conn.createStatement() - .execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - // create the index and ensure its empty as well - conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)"); - - conn = DriverManager.getConnection(getUrl(), props); - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)"); - conn.setAutoCommit(false); - for (int i = 0; i < 100; i++) { - stmt.setInt(1, i); - stmt.setString(2, "v1" + i); - stmt.setString(3, "v2" + i); - stmt.execute(); - } - conn.commit(); - - int numUpsertSelectRunners = 5; - ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); - CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec); - List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners); - // run one UPSERT SELECT for 100 rows (that locks the rows for a long time) - futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1))); - // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT) - for (int i = 0; i < 100; i += 25) { - futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5))); - } - int received = 0; - while (received < futures.size()) { - Future<Boolean> resultFuture = completionService.take(); - Boolean result = resultFuture.get(); - received++; - assertTrue(result); + try (Connection conn = driver.connect(url, props)) { + int numUpsertSelectRunners = 5; + ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); + CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec); + List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners); + // run one UPSERT SELECT for 100 rows (that locks the rows for a long time) + futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1))); + // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT) + for (int i = 0; i < 100; i += 25) { + futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5))); + } + int received = 0; + while (received < futures.size()) { + Future<Boolean> resultFuture = completionService.take(); + Boolean result = resultFuture.get(); + received++; + assertTrue(result); + } + exec.shutdownNow(); } - exec.shutdownNow(); - conn.close(); } + + /** + * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load + */ + @Test + public void testSplitDuringUpsertSelect() throws Exception { + int numUpsertSelectRunners = 4; + ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); + try (Connection conn = driver.connect(url, props)) { + final UpsertSelectRunner upsertSelectRunner = + new UpsertSelectRunner(dataTable, 0, 105, 1); + // keep running slow upsert selects + SlowBatchRegionObserver.SLOW_MUTATE = true; + for (int i = 0; i < numUpsertSelectRunners; i++) { + exec.submit(new UpsertSelectLooper(upsertSelectRunner)); + Thread.sleep(300); + } + + // keep trying to split the region + final HBaseTestingUtility utility = getUtility(); + final HBaseAdmin admin = utility.getHBaseAdmin(); + final TableName dataTN = TableName.valueOf(dataTable); + assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size()); + utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + try { + List<HRegionInfo> regions = admin.getTableRegions(dataTN); + if (regions.size() > 1) { + logger.info("Found region was split"); + return true; + } + if (regions.size() == 0) { + // This happens when region in transition or closed + logger.info("No region returned"); + return false; + } + ; + HRegionInfo hRegion = regions.get(0); + logger.info("Attempting to split region"); + admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2)); + return false; + } catch (NotServingRegionException nsre) { + // during split + return false; + } + } + }); + } finally { + SlowBatchRegionObserver.SLOW_MUTATE = false; + exec.shutdownNow(); + exec.awaitTermination(60, TimeUnit.SECONDS); + } + } + + /** + * Tests that UPSERT SELECT doesn't indefinitely block region closes + */ + @Test + public void testRegionCloseDuringUpsertSelect() throws Exception { + int numUpsertSelectRunners = 4; + ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); + try (Connection conn = driver.connect(url, props)) { + final UpsertSelectRunner upsertSelectRunner = + new UpsertSelectRunner(dataTable, 0, 105, 1); + // keep running slow upsert selects + SlowBatchRegionObserver.SLOW_MUTATE = true; + for (int i = 0; i < numUpsertSelectRunners; i++) { + exec.submit(new UpsertSelectLooper(upsertSelectRunner)); + Thread.sleep(300); + } + + final HBaseTestingUtility utility = getUtility(); + // try to close the region while UPSERT SELECTs are happening, + final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0); + final HBaseAdmin admin = utility.getHBaseAdmin(); + final HRegionInfo dataRegion = + admin.getTableRegions(TableName.valueOf(dataTable)).get(0); + logger.info("Closing data table region"); + admin.closeRegion(dataRs.getServerName(), dataRegion); + // make sure the region is offline + utility.waitFor(60000L, 1000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + List<HRegionInfo> onlineRegions = + admin.getOnlineRegions(dataRs.getServerName()); + for (HRegionInfo onlineRegion : onlineRegions) { + if (onlineRegion.equals(dataRegion)) { + logger.info("Data region still online"); + return false; + } + } + logger.info("Region is no longer online"); + return true; + } + }); + } finally { + SlowBatchRegionObserver.SLOW_MUTATE = false; + exec.shutdownNow(); + exec.awaitTermination(60, TimeUnit.SECONDS); + } + } public static class SlowBatchRegionObserver extends SimpleRegionObserver { + public static volatile boolean SLOW_MUTATE = false; @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { // model a slow batch that takes a long time - if (miniBatchOp.size()==100) { + if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) { try { Thread.sleep(6000); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/519273b8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 30f89cb..c3024a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver /** * This lock used for synchronizing the state of * {@link UngroupedAggregateRegionObserver#scansReferenceCount}, - * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible + * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible * dead lock situation in case below steps: * 1. We get read lock when we start writing local indexes, deletes etc.. * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they @@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @GuardedBy("lock") private int scansReferenceCount = 0; @GuardedBy("lock") - private boolean isRegionClosing = false; + private boolean isRegionClosingOrSplitting = false; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; private Configuration upsertSelectConfig; @@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver */ private void checkForRegionClosing() throws IOException { synchronized (lock) { - if(isRegionClosing) { + if(isRegionClosingOrSplitting) { lock.notifyAll(); throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock."); } @@ -499,10 +499,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useIndexProto = false; } boolean acquiredLock = false; + boolean incrScanRefCount = false; try { if(needToWrite) { synchronized (lock) { + if (isRegionClosingOrSplitting) { + throw new IOException("Temporarily unable to write from scan because region is closing or splitting"); + } scansReferenceCount++; + incrScanRefCount = true; lock.notifyAll(); } } @@ -755,7 +760,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } } finally { - if (needToWrite) { + if (needToWrite && incrScanRefCount) { synchronized (lock) { scansReferenceCount--; if (scansReferenceCount < 0) { @@ -1295,6 +1300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Don't allow splitting if operations need read and write to same region are going on in the // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. synchronized (lock) { + isRegionClosingOrSplitting = true; if (scansReferenceCount > 0) { throw new IOException("Operations like local index building/delete/upsert select" + " might be going on so not allowing to split."); @@ -1319,7 +1325,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) throws IOException { synchronized (lock) { - isRegionClosing = true; + isRegionClosingOrSplitting = true; while (scansReferenceCount > 0) { try { lock.wait(1000);
