Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 72dbbb38d -> 519273b85
Revert "PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)" This reverts commit 6f65a7935b640969e570b870de9fa59e2a5bca67. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/64ef10b7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/64ef10b7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/64ef10b7 Branch: refs/heads/4.x-HBase-1.2 Commit: 64ef10b7d823db8a4f625a26304dded8e2f7df76 Parents: 72dbbb3 Author: James Taylor <[email protected]> Authored: Fri Sep 29 12:35:38 2017 -0700 Committer: James Taylor <[email protected]> Committed: Fri Sep 29 12:35:38 2017 -0700 ---------------------------------------------------------------------- .../UpsertSelectOverlappingBatchesIT.java | 239 ++++--------------- .../UngroupedAggregateRegionObserver.java | 23 +- 2 files changed, 53 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/64ef10b7/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java index fbf3231..53346b9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java @@ -18,7 +18,6 @@ package org.apache.phoenix.execute; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.sql.Connection; @@ -33,43 +32,25 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT { - private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class); - private Properties props; - private static volatile String dataTable; - private String index; - + @BeforeClass public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3); @@ -79,12 +60,7 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - - @AfterClass - public static void tearDownClass() throws Exception { - getUtility().shutdownMiniCluster(); - } - + private class UpsertSelectRunner implements Callable<Boolean> { private final String dataTable; private final int minIndex; @@ -113,185 +89,58 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI return true; } } + } - - private static class UpsertSelectLooper implements Runnable { - private UpsertSelectRunner runner; - public UpsertSelectLooper(UpsertSelectRunner runner) { - this.runner = runner; - } - @Override - public void run() { - while (true) { - try { - runner.call(); - } - catch (Exception e) { - if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) { - logger.info("Interrupted, exiting", e); - Thread.currentThread().interrupt(); - return; - } - logger.error("Hit exception while writing", e); - } - } - }}; - - @Before - public void setup() throws Exception { - SlowBatchRegionObserver.SLOW_MUTATE = false; - props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - dataTable = generateUniqueName(); - index = "IDX_" + dataTable; - try (Connection conn = driver.connect(url, props)) { - conn.createStatement().execute("CREATE TABLE " + dataTable - + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - // create the index and ensure its empty as well - conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)"); - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)"); - conn.setAutoCommit(false); - for (int i = 0; i < 100; i++) { - stmt.setInt(1, i); - stmt.setString(2, "v1" + i); - stmt.setString(3, "v2" + i); - stmt.execute(); - } - conn.commit(); - } - } - + @Test public void testUpsertSelectSameBatchConcurrently() throws Exception { - try (Connection conn = driver.connect(url, props)) { - int numUpsertSelectRunners = 5; - ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); - CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec); - List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners); - // run one UPSERT SELECT for 100 rows (that locks the rows for a long time) - futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1))); - // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT) - for (int i = 0; i < 100; i += 25) { - futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5))); - } - int received = 0; - while (received < futures.size()) { - Future<Boolean> resultFuture = completionService.take(); - Boolean result = resultFuture.get(); - received++; - assertTrue(result); - } - exec.shutdownNow(); + final String dataTable = generateUniqueName(); + final String index = "IDX_" + dataTable; + // create the table and ensure its empty + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(url, props); + conn.createStatement() + .execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + // create the index and ensure its empty as well + conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)"); + + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)"); + conn.setAutoCommit(false); + for (int i = 0; i < 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, "v1" + i); + stmt.setString(3, "v2" + i); + stmt.execute(); + } + conn.commit(); + + int numUpsertSelectRunners = 5; + ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); + CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec); + List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners); + // run one UPSERT SELECT for 100 rows (that locks the rows for a long time) + futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1))); + // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT) + for (int i = 0; i < 100; i += 25) { + futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5))); + } + int received = 0; + while (received < futures.size()) { + Future<Boolean> resultFuture = completionService.take(); + Boolean result = resultFuture.get(); + received++; + assertTrue(result); } + exec.shutdownNow(); + conn.close(); } - - /** - * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load - */ - @Test - public void testSplitDuringUpsertSelect() throws Exception { - int numUpsertSelectRunners = 4; - ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); - try (Connection conn = driver.connect(url, props)) { - final UpsertSelectRunner upsertSelectRunner = - new UpsertSelectRunner(dataTable, 0, 105, 1); - // keep running slow upsert selects - SlowBatchRegionObserver.SLOW_MUTATE = true; - for (int i = 0; i < numUpsertSelectRunners; i++) { - exec.submit(new UpsertSelectLooper(upsertSelectRunner)); - Thread.sleep(300); - } - - // keep trying to split the region - final HBaseTestingUtility utility = getUtility(); - final HBaseAdmin admin = utility.getHBaseAdmin(); - final TableName dataTN = TableName.valueOf(dataTable); - assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size()); - utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - try { - List<HRegionInfo> regions = admin.getTableRegions(dataTN); - if (regions.size() > 1) { - logger.info("Found region was split"); - return true; - } - if (regions.size() == 0) { - // This happens when region in transition or closed - logger.info("No region returned"); - return false; - } - ; - HRegionInfo hRegion = regions.get(0); - logger.info("Attempting to split region"); - admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2)); - return false; - } catch (NotServingRegionException nsre) { - // during split - return false; - } - } - }); - } finally { - SlowBatchRegionObserver.SLOW_MUTATE = false; - exec.shutdownNow(); - exec.awaitTermination(60, TimeUnit.SECONDS); - } - } - - /** - * Tests that UPSERT SELECT doesn't indefinitely block region closes - */ - @Test - public void testRegionCloseDuringUpsertSelect() throws Exception { - int numUpsertSelectRunners = 4; - ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners); - try (Connection conn = driver.connect(url, props)) { - final UpsertSelectRunner upsertSelectRunner = - new UpsertSelectRunner(dataTable, 0, 105, 1); - // keep running slow upsert selects - SlowBatchRegionObserver.SLOW_MUTATE = true; - for (int i = 0; i < numUpsertSelectRunners; i++) { - exec.submit(new UpsertSelectLooper(upsertSelectRunner)); - Thread.sleep(300); - } - - final HBaseTestingUtility utility = getUtility(); - // try to close the region while UPSERT SELECTs are happening, - final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0); - final HBaseAdmin admin = utility.getHBaseAdmin(); - final HRegionInfo dataRegion = - admin.getTableRegions(TableName.valueOf(dataTable)).get(0); - logger.info("Closing data table region"); - admin.closeRegion(dataRs.getServerName(), dataRegion); - // make sure the region is offline - utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - List<HRegionInfo> onlineRegions = - admin.getOnlineRegions(dataRs.getServerName()); - for (HRegionInfo onlineRegion : onlineRegions) { - if (onlineRegion.equals(dataRegion)) { - logger.info("Data region still online"); - return false; - } - } - logger.info("Region is no longer online"); - return true; - } - }); - } finally { - SlowBatchRegionObserver.SLOW_MUTATE = false; - exec.shutdownNow(); - exec.awaitTermination(60, TimeUnit.SECONDS); - } - } public static class SlowBatchRegionObserver extends SimpleRegionObserver { - public static volatile boolean SLOW_MUTATE = false; @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { // model a slow batch that takes a long time - if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) { + if (miniBatchOp.size()==100) { try { Thread.sleep(6000); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/64ef10b7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 582e606..30f89cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver /** * This lock used for synchronizing the state of * {@link UngroupedAggregateRegionObserver#scansReferenceCount}, - * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible + * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible * dead lock situation in case below steps: * 1. We get read lock when we start writing local indexes, deletes etc.. * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they @@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @GuardedBy("lock") private int scansReferenceCount = 0; @GuardedBy("lock") - private boolean isRegionClosingOrSplitting = false; + private boolean isRegionClosing = false; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; private Configuration upsertSelectConfig; @@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver */ private void checkForRegionClosing() throws IOException { synchronized (lock) { - if(isRegionClosingOrSplitting) { + if(isRegionClosing) { lock.notifyAll(); throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock."); } @@ -499,16 +499,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useIndexProto = false; } boolean acquiredLock = false; - if(needToWrite) { - synchronized (lock) { - if (isRegionClosingOrSplitting) { - throw new IOException("Temporarily unable to write from scan because region is closing or splitting"); + try { + if(needToWrite) { + synchronized (lock) { + scansReferenceCount++; + lock.notifyAll(); } - scansReferenceCount++; - lock.notifyAll(); } - } - try { region.startRegionOperation(); acquiredLock = true; synchronized (innerScanner) { @@ -1298,7 +1295,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Don't allow splitting if operations need read and write to same region are going on in the // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. synchronized (lock) { - isRegionClosingOrSplitting = true; if (scansReferenceCount > 0) { throw new IOException("Operations like local index building/delete/upsert select" + " might be going on so not allowing to split."); @@ -1323,13 +1319,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) throws IOException { synchronized (lock) { - isRegionClosingOrSplitting = true; + isRegionClosing = true; while (scansReferenceCount > 0) { try { lock.wait(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting for completion of operations like local index building/delete/upsert select"); } } }
