anoopsjohn commented on a change in pull request #2021:
URL: https://github.com/apache/hbase/pull/2021#discussion_r457991255
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
##########
@@ -43,53 +49,104 @@
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLogRoller.class);
- private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
- private static final int logRollPeriod = 20 * 1000;
+ private static final int LOG_ROLL_PERIOD = 20 * 1000;
+ private static final String LOG_DIR = "WALs";
+ private static final String ARCHIVE_DIR = "archiveWALs";
+ private static final String WAL_PREFIX = "test-log-roller";
+ private static Configuration CONF;
+ private static LogRoller ROLLER;
+ private static Path ROOT_DIR;
+ private static FileSystem FS;
@Before
public void setup() throws Exception {
- TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period",
logRollPeriod);
- TEST_UTIL.startMiniCluster(1);
- TableName name = TableName.valueOf("Test");
- TEST_UTIL.createTable(name, Bytes.toBytes("cf"));
- TEST_UTIL.waitTableAvailable(name);
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+ CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+ ROOT_DIR = TEST_UTIL.getRandomDir();
+ FS = FileSystem.get(CONF);
+ RegionServerServices services = Mockito.mock(RegionServerServices.class);
+ Mockito.when(services.getConfiguration()).thenReturn(CONF);
+ ROLLER = new LogRoller(services);
+ ROLLER.start();
}
@After
public void tearDown() throws Exception {
+ ROLLER.close();
+ FS.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testRemoveClosedWAL() throws Exception {
- HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
- Configuration conf = rs.getConfiguration();
- LogRoller logRoller =
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller();
- int originalSize = logRoller.getWalNeedsRoll().size();
- FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal1);
- FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal2);
- FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal3);
-
- assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size());
- assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1));
-
- wal1.close();
- Thread.sleep(2 * logRollPeriod);
-
- assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size());
- assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1));
-
- wal2.close();
- wal3.close();
- Thread.sleep(2 * logRollPeriod);
-
- assertEquals(originalSize, logRoller.getWalNeedsRoll().size());
+ assertEquals(0, ROLLER.getWalNeedsRoll().size());
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, getWALSuffix(i));
+ ROLLER.addWAL(wal);
+ }
+
+ assertEquals(3, ROLLER.getWalNeedsRoll().size());
+ Iterator<WAL> it = ROLLER.getWalNeedsRoll().keySet().iterator();
+ WAL wal = it.next();
+ assertTrue(ROLLER.getWalNeedsRoll().containsKey(wal));
+
+ wal.close();
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+
+ assertEquals(2, ROLLER.getWalNeedsRoll().size());
+ assertFalse(ROLLER.getWalNeedsRoll().containsKey(wal));
+
+ wal = it.next();
+ wal.close();
+ wal = it.next();
+ wal.close();
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+
+ assertEquals(0, ROLLER.getWalNeedsRoll().size());
+ }
+
+ /**
+ * verify that each wal roll separately
+ */
+ @Test
+ public void testRequestRollWithMultiWal() throws Exception {
+ // add multiple wal
+ Map<FSHLog, Path> wals = new HashMap<>();
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, getWALSuffix(i));
+ wal.init();
+ wals.put(wal, wal.getCurrentFileName());
+ ROLLER.addWAL(wal);
+ Thread.sleep(3000);
Review comment:
We need a 3 sec sleep here? !
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
##########
@@ -232,7 +230,7 @@ private boolean isWaiting() {
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
- return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) &&
isWaiting();
Review comment:
Checking its usage, I think this API impl is already buggy. This just
checks the status of the boolean. Once we start a roll on a WAL, we reset the
boolean (Even before this patch). So it is not clearly telling anything abt
the roll status. This can return true even while an active wal roll is going
on. We can keep it as an another jira and fix (if required).. Just add some
TODO comments here.
We might need another boolean in Controller which clearly tracks whether we
are ongoing a roll. So this really need to check that status as well as a
requested roll status.
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
##########
@@ -43,53 +49,104 @@
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLogRoller.class);
- private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
- private static final int logRollPeriod = 20 * 1000;
+ private static final int LOG_ROLL_PERIOD = 20 * 1000;
+ private static final String LOG_DIR = "WALs";
+ private static final String ARCHIVE_DIR = "archiveWALs";
+ private static final String WAL_PREFIX = "test-log-roller";
+ private static Configuration CONF;
+ private static LogRoller ROLLER;
+ private static Path ROOT_DIR;
+ private static FileSystem FS;
@Before
public void setup() throws Exception {
- TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period",
logRollPeriod);
- TEST_UTIL.startMiniCluster(1);
- TableName name = TableName.valueOf("Test");
- TEST_UTIL.createTable(name, Bytes.toBytes("cf"));
- TEST_UTIL.waitTableAvailable(name);
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+ CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+ ROOT_DIR = TEST_UTIL.getRandomDir();
+ FS = FileSystem.get(CONF);
+ RegionServerServices services = Mockito.mock(RegionServerServices.class);
+ Mockito.when(services.getConfiguration()).thenReturn(CONF);
+ ROLLER = new LogRoller(services);
+ ROLLER.start();
}
@After
public void tearDown() throws Exception {
+ ROLLER.close();
+ FS.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testRemoveClosedWAL() throws Exception {
- HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
- Configuration conf = rs.getConfiguration();
- LogRoller logRoller =
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller();
- int originalSize = logRoller.getWalNeedsRoll().size();
- FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal1);
- FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal2);
- FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal3);
-
- assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size());
- assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1));
-
- wal1.close();
- Thread.sleep(2 * logRollPeriod);
-
- assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size());
- assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1));
-
- wal2.close();
- wal3.close();
- Thread.sleep(2 * logRollPeriod);
-
- assertEquals(originalSize, logRoller.getWalNeedsRoll().size());
+ assertEquals(0, ROLLER.getWalNeedsRoll().size());
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, getWALSuffix(i));
+ ROLLER.addWAL(wal);
+ }
+
+ assertEquals(3, ROLLER.getWalNeedsRoll().size());
+ Iterator<WAL> it = ROLLER.getWalNeedsRoll().keySet().iterator();
+ WAL wal = it.next();
+ assertTrue(ROLLER.getWalNeedsRoll().containsKey(wal));
+
+ wal.close();
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+
+ assertEquals(2, ROLLER.getWalNeedsRoll().size());
+ assertFalse(ROLLER.getWalNeedsRoll().containsKey(wal));
+
+ wal = it.next();
+ wal.close();
+ wal = it.next();
+ wal.close();
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+
+ assertEquals(0, ROLLER.getWalNeedsRoll().size());
+ }
+
+ /**
+ * verify that each wal roll separately
+ */
+ @Test
+ public void testRequestRollWithMultiWal() throws Exception {
+ // add multiple wal
+ Map<FSHLog, Path> wals = new HashMap<>();
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, getWALSuffix(i));
+ wal.init();
+ wals.put(wal, wal.getCurrentFileName());
+ ROLLER.addWAL(wal);
+ Thread.sleep(3000);
Review comment:
We want 3 WALs will different periodic roll time period? Why we need?
If so, can we limit the time to be 1 sec or lesser?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
##########
@@ -232,7 +230,7 @@ private boolean isWaiting() {
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
- return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) &&
isWaiting();
+ return wals.values().stream().noneMatch(RollController::isRollRequested)
&& isWaiting();
Review comment:
Here rather than isRollRequested, we need needsRoll?
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
##########
@@ -43,53 +49,104 @@
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLogRoller.class);
- private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
- private static final int logRollPeriod = 20 * 1000;
+ private static final int LOG_ROLL_PERIOD = 20 * 1000;
+ private static final String LOG_DIR = "WALs";
+ private static final String ARCHIVE_DIR = "archiveWALs";
+ private static final String WAL_PREFIX = "test-log-roller";
+ private static Configuration CONF;
+ private static LogRoller ROLLER;
+ private static Path ROOT_DIR;
+ private static FileSystem FS;
@Before
public void setup() throws Exception {
- TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period",
logRollPeriod);
- TEST_UTIL.startMiniCluster(1);
- TableName name = TableName.valueOf("Test");
- TEST_UTIL.createTable(name, Bytes.toBytes("cf"));
- TEST_UTIL.waitTableAvailable(name);
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+ CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+ ROOT_DIR = TEST_UTIL.getRandomDir();
+ FS = FileSystem.get(CONF);
+ RegionServerServices services = Mockito.mock(RegionServerServices.class);
+ Mockito.when(services.getConfiguration()).thenReturn(CONF);
+ ROLLER = new LogRoller(services);
+ ROLLER.start();
}
@After
public void tearDown() throws Exception {
+ ROLLER.close();
+ FS.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testRemoveClosedWAL() throws Exception {
- HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
- Configuration conf = rs.getConfiguration();
- LogRoller logRoller =
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller();
- int originalSize = logRoller.getWalNeedsRoll().size();
- FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal1);
- FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal2);
- FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(),
-
AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()),
conf);
- logRoller.addWAL(wal3);
-
- assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size());
- assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1));
-
- wal1.close();
- Thread.sleep(2 * logRollPeriod);
-
- assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size());
- assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1));
-
- wal2.close();
- wal3.close();
- Thread.sleep(2 * logRollPeriod);
-
- assertEquals(originalSize, logRoller.getWalNeedsRoll().size());
+ assertEquals(0, ROLLER.getWalNeedsRoll().size());
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, getWALSuffix(i));
+ ROLLER.addWAL(wal);
+ }
+
+ assertEquals(3, ROLLER.getWalNeedsRoll().size());
+ Iterator<WAL> it = ROLLER.getWalNeedsRoll().keySet().iterator();
+ WAL wal = it.next();
+ assertTrue(ROLLER.getWalNeedsRoll().containsKey(wal));
+
+ wal.close();
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+
+ assertEquals(2, ROLLER.getWalNeedsRoll().size());
+ assertFalse(ROLLER.getWalNeedsRoll().containsKey(wal));
+
+ wal = it.next();
+ wal.close();
+ wal = it.next();
+ wal.close();
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+
+ assertEquals(0, ROLLER.getWalNeedsRoll().size());
+ }
+
+ /**
+ * verify that each wal roll separately
+ */
+ @Test
+ public void testRequestRollWithMultiWal() throws Exception {
+ // add multiple wal
+ Map<FSHLog, Path> wals = new HashMap<>();
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, getWALSuffix(i));
+ wal.init();
+ wals.put(wal, wal.getCurrentFileName());
+ ROLLER.addWAL(wal);
+ Thread.sleep(3000);
+ }
+
+ // request roll
+ Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
+ Map.Entry<FSHLog, Path> walEntry = it.next();
+ walEntry.getKey().requestLogRoll();
+ Thread.sleep(5000);
+
+ assertNotEquals(walEntry.getValue(),
walEntry.getKey().getCurrentFileName());
+ walEntry.setValue(walEntry.getKey().getCurrentFileName());
+ while (it.hasNext()) {
+ walEntry = it.next();
+ assertEquals(walEntry.getValue(),
walEntry.getKey().getCurrentFileName());
Review comment:
Good...
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
##########
@@ -148,53 +147,52 @@ private void abort(String reason, Throwable cause) {
@Override
public void run() {
while (running) {
- boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
- periodic = (now - this.lastRollTime) > this.rollPeriod;
- if (periodic) {
- // Time for periodic roll, fall through
- LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
- } else {
- synchronized (this) {
- if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
- // WAL roll requested, fall through
- LOG.debug("WAL roll requested");
- } else {
- try {
- wait(this.threadWakeFrequency);
- } catch (InterruptedException e) {
- // restore the interrupt state
- Thread.currentThread().interrupt();
- }
- // goto the beginning to check whether again whether we should
fall through to roll
- // several WALs, and also check whether we should quit.
- continue;
+ synchronized (this) {
+ if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
+ try {
+ wait(this.threadWakeFrequency);
+ } catch (InterruptedException e) {
+ // restore the interrupt state
+ Thread.currentThread().interrupt();
}
+ // goto the beginning to check whether again whether we should fall
through to roll
+ // several WALs, and also check whether we should quit.
+ continue;
}
}
try {
- this.lastRollTime = System.currentTimeMillis();
- for (Iterator<Entry<WAL, Boolean>> iter =
walNeedsRoll.entrySet().iterator(); iter
- .hasNext();) {
- Entry<WAL, Boolean> entry = iter.next();
+ for (Iterator<Entry<WAL, RollController>> iter =
wals.entrySet().iterator();
+ iter.hasNext();) {
+ Entry<WAL, RollController> entry = iter.next();
WAL wal = entry.getKey();
- // reset the flag in front to avoid missing roll request before we
return from rollWriter.
- walNeedsRoll.put(wal, Boolean.FALSE);
- Map<byte[], List<byte[]>> regionsToFlush = null;
+ RollController controller = entry.getValue();
+ boolean isRequestRoll;
Review comment:
Seems this boolean is not needed now. Can u pls delete
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]