virajjasani commented on a change in pull request #2574:
URL: https://github.com/apache/hbase/pull/2574#discussion_r512006608
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
##########
@@ -7364,4 +7366,157 @@ protected HStoreForTesting(final HRegion region,
return super.doCompaction(cr, filesToCompact, user, compactionStartTime,
newFiles);
}
}
+
+ @Test
+ public void testCloseNoInterrupt() throws Exception {
+ byte[] cf1 = Bytes.toBytes("CF1");
+ byte[][] families = { cf1 };
+
+ Configuration conf = new Configuration(CONF);
+ // Disable close thread interrupt and server abort behavior
+ conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, false);
+ region = initHRegion(tableName, method, conf, families);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean holderInterrupted = new AtomicBoolean();
+ Thread holder = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting region operation holder");
+ region.startRegionOperation(Operation.SCAN);
+ latch.countDown();
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted");
+ holderInterrupted.set(true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ region.closeRegionOperation();
+ } catch (IOException e) {
+ }
+ LOG.info("Stopped region operation holder");
+ }
+ }
+ });
+
+ holder.start();
+ latch.await();
+ region.close();
+ holder.join();
+ region = null;
+
+ assertFalse("Region lock holder should not have been interrupted",
holderInterrupted.get());
+ }
+
+ @Test
+ public void testCloseInterrupt() throws Exception {
+ byte[] cf1 = Bytes.toBytes("CF1");
+ byte[][] families = { cf1 };
+
+ Configuration conf = new Configuration(CONF);
+ // Enable close thread interrupt and server abort behavior
+ conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
+ region = initHRegion(tableName, method, conf, families);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean holderInterrupted = new AtomicBoolean();
+ Thread holder = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting region operation holder");
+ region.startRegionOperation(Operation.SCAN);
+ latch.countDown();
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted");
+ holderInterrupted.set(true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ region.closeRegionOperation();
+ } catch (IOException e) {
+ }
+ LOG.info("Stopped region operation holder");
+ }
+ }
+ });
+
+ holder.start();
+ latch.await();
+ region.close();
+ holder.join();
+ region = null;
+
+ assertTrue("Region lock holder was not interrupted",
holderInterrupted.get());
+ }
+
+ @Test
+ public void testCloseAbort() throws Exception {
+ byte[] cf1 = Bytes.toBytes("CF1");
+ byte[][] families = { cf1 };
+
+ Configuration conf = new Configuration(CONF);
+ // Enable close thread interrupt and server abort behavior
+ // Set the close lock acquisition wait time to 5 seconds
+ conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
+ conf.setInt(HRegion.CLOSE_WAIT_TIME, 5*1000);
+ region = initHRegion(tableName, method, conf, families);
+ RegionServerServices rsServices = mock(RegionServerServices.class);
+ region.rsServices = rsServices;
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread holder = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting region operation holder");
+ region.startRegionOperation(Operation.SCAN);
+ latch.countDown();
+ // Hold the lock for 10 seconds no matter how many times we are
interrupted
+ int timeRemaining = 10 * 1000;
+ while (timeRemaining > 0) {
+ long start = EnvironmentEdgeManager.currentTime();
Review comment:
nit: if the only usage is to find diff b/ `end` and `start`, directly
using `System.currentTimeMillis()` might be preferred option?
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
##########
@@ -7364,4 +7366,157 @@ protected HStoreForTesting(final HRegion region,
return super.doCompaction(cr, filesToCompact, user, compactionStartTime,
newFiles);
}
}
+
+ @Test
+ public void testCloseNoInterrupt() throws Exception {
+ byte[] cf1 = Bytes.toBytes("CF1");
+ byte[][] families = { cf1 };
+
+ Configuration conf = new Configuration(CONF);
+ // Disable close thread interrupt and server abort behavior
+ conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, false);
+ region = initHRegion(tableName, method, conf, families);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean holderInterrupted = new AtomicBoolean();
+ Thread holder = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Starting region operation holder");
+ region.startRegionOperation(Operation.SCAN);
+ latch.countDown();
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted");
+ holderInterrupted.set(true);
Review comment:
nit: Instead of using `holderInterrupted`, maybe we can throw
`AssertionError` with message that holder should not have been interrupted?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -1679,22 +1688,82 @@ public void setTimeoutForWriteLock(long
timeoutForWriteLock) {
}
}
- if (timeoutForWriteLock == null
- || timeoutForWriteLock == Long.MAX_VALUE) {
- // block waiting for the lock for closing
- lock.writeLock().lock(); // FindBugs: Complains
UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
+ // Set the closing flag
+ // From this point new arrivals at the region lock will get NSRE.
+
+ this.closing.set(true);
+ LOG.info("Closing region {}", this);
+
+ // Acquire the close lock
+
+ // The configuration parameter CLOSE_WAIT_ABORT is overloaded to enable
both
+ // the new regionserver abort condition and interrupts for running
requests.
+ // If CLOSE_WAIT_ABORT is not enabled there is no change from earlier
behavior,
+ // we will not attempt to interrupt threads servicing requests nor crash
out
+ // the regionserver if something remains stubborn.
+
+ boolean canAbort = conf.getBoolean(CLOSE_WAIT_ABORT,
DEFAULT_CLOSE_WAIT_ABORT);
+ boolean useTimedWait = false;
+ if (timeoutForWriteLock == null || timeoutForWriteLock == Long.MAX_VALUE) {
+ if (canAbort) {
+ timeoutForWriteLock = conf.getLong(CLOSE_WAIT_TIME,
DEFAULT_CLOSE_WAIT_TIME);
+ useTimedWait = true;
+ }
} else {
+ // convert legacy use of timeoutForWriteLock in seconds to new use in
millis
+ timeoutForWriteLock = TimeUnit.SECONDS.toMillis(timeoutForWriteLock);
+ useTimedWait = true;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug((useTimedWait ? "Time limited wait" : "Waiting") + " for close
lock on " + this);
Review comment:
maybe `Waiting` can be replaced with `Waiting without time limit` ?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -1679,22 +1688,82 @@ public void setTimeoutForWriteLock(long
timeoutForWriteLock) {
}
}
- if (timeoutForWriteLock == null
- || timeoutForWriteLock == Long.MAX_VALUE) {
- // block waiting for the lock for closing
- lock.writeLock().lock(); // FindBugs: Complains
UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
+ // Set the closing flag
+ // From this point new arrivals at the region lock will get NSRE.
+
+ this.closing.set(true);
+ LOG.info("Closing region {}", this);
+
+ // Acquire the close lock
+
+ // The configuration parameter CLOSE_WAIT_ABORT is overloaded to enable
both
+ // the new regionserver abort condition and interrupts for running
requests.
+ // If CLOSE_WAIT_ABORT is not enabled there is no change from earlier
behavior,
+ // we will not attempt to interrupt threads servicing requests nor crash
out
+ // the regionserver if something remains stubborn.
+
+ boolean canAbort = conf.getBoolean(CLOSE_WAIT_ABORT,
DEFAULT_CLOSE_WAIT_ABORT);
+ boolean useTimedWait = false;
+ if (timeoutForWriteLock == null || timeoutForWriteLock == Long.MAX_VALUE) {
+ if (canAbort) {
+ timeoutForWriteLock = conf.getLong(CLOSE_WAIT_TIME,
DEFAULT_CLOSE_WAIT_TIME);
+ useTimedWait = true;
+ }
} else {
+ // convert legacy use of timeoutForWriteLock in seconds to new use in
millis
+ timeoutForWriteLock = TimeUnit.SECONDS.toMillis(timeoutForWriteLock);
Review comment:
Although this is not an atomic operation on volatile, but we are just
converting it's own value in sec to ms, hence we should be good here.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -688,14 +688,17 @@ void sawNoSuchFamily() {
// Last flush time for each Store. Useful when we are flushing for each
column
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new
ConcurrentHashMap<>();
- final RegionServerServices rsServices;
+ protected RegionServerServices rsServices;
Review comment:
Similar to how we passed `conf` all the way from HBaseTestingUtility
methods to actual `HRegion` instance, we can also pass `rsServices` instance in
same methods and we won't have to worry about changing this to non-final, but
not a strong opinion if this means too many args in testing utility methods so
it's upto you.
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionInterrupt.java
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestRegionInterrupt {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionInterrupt.class);
+
+ private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final Logger LOG =
LoggerFactory.getLogger(TestRegionInterrupt.class);
+
+ static final int SLEEP_TIME = 10 * 1000;
+ static final byte[] FAMILY = Bytes.toBytes("info");
+
+ @Rule
+ public TableNameTestRule name = new TableNameTestRule();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class,
Region.class);
+ conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
Review comment:
We don't want to restrict 5 min of wait time by updating
`CLOSE_WAIT_TIME` ?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -8793,7 +8921,7 @@ public void closeRegionOperation(Operation operation)
throws IOException {
* @throws InterruptedIOException if interrupted while waiting for a lock
*/
private void startBulkRegionOperation(boolean writeLockNeeded)
- throws NotServingRegionException, RegionTooBusyException,
InterruptedIOException {
+ throws NotServingRegionException, IOException {
Review comment:
nit: we can remove NSRE here
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -9000,6 +9129,49 @@ public long getReadPoint() {
return getReadPoint(IsolationLevel.READ_COMMITTED);
}
+ /**
+ * Interrupt any region options that have acquired the region lock via
+ * {@link
#startRegionOperation(org.apache.hadoop.hbase.regionserver.Region.Operation)},
+ * or {@link #startBulkRegionOperation(boolean)}.
+ */
+ private void interruptRegionOperations() {
+ for (Thread t: regionLockHolders) {
+ t.interrupt();
+ }
+ }
+
+ /**
+ * Check thread interrupt status and throw an exception if interrupted.
+ * @throws NotServingRegionException if region is closing
+ * @throws InterruptedIOException if interrupted but region is not closing
+ */
+ // Package scope for tests
+ void checkInterrupt() throws NotServingRegionException,
InterruptedIOException {
+ if (Thread.interrupted()) {
+ if (this.closing.get()) {
+ throw new NotServingRegionException(
+ getRegionInfo().getRegionNameAsString() + " is closing");
+ }
+ throw new InterruptedIOException();
+ }
+ }
+
+ /**
+ * Throw the correct exception upon interrupt
+ * @param t cause
+ * @throws NotServingRegionException if region is closing
+ * @throws InterruptedIOException in all cases except if region is closing
Review comment:
nit: put these lines by mistake? :)
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionInterrupt.java
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestRegionInterrupt {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionInterrupt.class);
+
+ private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final Logger LOG =
LoggerFactory.getLogger(TestRegionInterrupt.class);
+
+ static final int SLEEP_TIME = 10 * 1000;
+ static final byte[] FAMILY = Bytes.toBytes("info");
+
+ @Rule
+ public TableNameTestRule name = new TableNameTestRule();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class,
Region.class);
+ conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout=120000)
Review comment:
Test category `LargeTests` will take care of timeouts so we can remove
all `timeout` from individual tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]