Repository: hbase Updated Branches: refs/heads/master 8192a6b6e -> e5a288e5c
HBASE-17053 Remove LogRollerExitedChecker Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5a288e5 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5a288e5 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5a288e5 Branch: refs/heads/master Commit: e5a288e5c04d6f9a1b31549e4e3d979aeee4fd94 Parents: 8192a6b Author: zhangduo <zhang...@apache.org> Authored: Wed Nov 9 21:07:07 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Nov 10 10:52:49 2016 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/HRegionServer.java | 51 +-------- .../hadoop/hbase/regionserver/LogRoller.java | 16 ++- .../hbase/regionserver/wal/AsyncFSWAL.java | 106 ------------------- .../hadoop/hbase/regionserver/wal/FSHLog.java | 4 - .../hadoop/hbase/wal/DisabledWALProvider.java | 4 - .../java/org/apache/hadoop/hbase/wal/WAL.java | 7 -- .../regionserver/TestFailedAppendAndSync.java | 2 +- .../hbase/regionserver/TestWALLockup.java | 4 +- .../hbase/wal/WALPerformanceEvaluation.java | 2 +- 9 files changed, 20 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 312e8c1..24d8170 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -375,8 +375,6 @@ public class HRegionServer extends HasThread implements // WAL roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes final LogRoller walRoller; - // Lazily initialized if this RegionServer hosts a meta table. - final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>(); // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -1722,34 +1720,6 @@ public class HRegionServer extends HasThread implements return new WALFactory(conf, listeners, serverName.toString()); } - /** - * We initialize the roller for the wal that handles meta lazily - * since we don't know if this regionserver will handle it. All calls to - * this method return a reference to the that same roller. As newly referenced - * meta regions are brought online, they will be offered to the roller for maintenance. - * As a part of that registration process, the roller will add itself as a - * listener on the wal. - */ - protected LogRoller ensureMetaWALRoller() { - // Using a tmp log roller to ensure metaLogRoller is alive once it is not - // null - LogRoller roller = metawalRoller.get(); - if (null == roller) { - LogRoller tmpLogRoller = new LogRoller(this, this); - String n = Thread.currentThread().getName(); - Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), - n + "-MetaLogRoller", uncaughtExceptionHandler); - if (metawalRoller.compareAndSet(null, tmpLogRoller)) { - roller = tmpLogRoller; - } else { - // Another thread won starting the roller - Threads.shutdown(tmpLogRoller.getThread()); - roller = metawalRoller.get(); - } - } - return roller; - } - public MetricsRegionServer getRegionServerMetrics() { return this.metricsRegionServer; } @@ -1914,11 +1884,6 @@ public class HRegionServer extends HasThread implements stop("One or more threads are no longer alive -- stop"); return false; } - final LogRoller metawalRoller = this.metawalRoller.get(); - if (metawalRoller != null && !metawalRoller.isAlive()) { - stop("Meta WAL roller thread is no longer alive -- stop"); - return false; - } return true; } @@ -1932,11 +1897,9 @@ public class HRegionServer extends HasThread implements @Override public WAL getWAL(HRegionInfo regionInfo) throws IOException { WAL wal; - LogRoller roller = walRoller; - //_ROOT_ and hbase:meta regions have separate WAL. - if (regionInfo != null && regionInfo.isMetaTable() && - regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { - roller = ensureMetaWALRoller(); + // _ROOT_ and hbase:meta regions have separate WAL. + if (regionInfo != null && regionInfo.isMetaTable() + && regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes()); } else if (regionInfo == null) { wal = walFactory.getWAL(UNSPECIFIED_REGION, null); @@ -1944,7 +1907,7 @@ public class HRegionServer extends HasThread implements byte[] namespace = regionInfo.getTable().getNamespace(); wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace); } - roller.addWAL(wal); + walRoller.addWAL(wal); return wal; } @@ -2330,11 +2293,7 @@ public class HRegionServer extends HasThread implements this.spanReceiverHost.closeReceivers(); } if (this.walRoller != null) { - Threads.shutdown(this.walRoller.getThread()); - } - final LogRoller metawalRoller = this.metawalRoller.get(); - if (metawalRoller != null) { - Threads.shutdown(metawalRoller.getThread()); + this.walRoller.close(); } if (this.compactSplitThread != null) { this.compactSplitThread.join(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 9a2bb34..24f0d1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.Closeable; import java.io.IOException; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -49,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private @VisibleForTesting -public class LogRoller extends HasThread { +public class LogRoller extends HasThread implements Closeable { private static final Log LOG = LogFactory.getLog(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); @@ -62,6 +63,8 @@ public class LogRoller extends HasThread { private final long rollperiod; private final int threadWakeFrequency; + private volatile boolean running = true; + public void addWAL(final WAL wal) { if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { wal.registerWALActionsListener(new WALActionsListener.Base() { @@ -110,7 +113,7 @@ public class LogRoller extends HasThread { @Override public void run() { - while (!server.isStopped()) { + while (running) { long now = System.currentTimeMillis(); boolean periodic = false; if (!rollLog.get()) { @@ -167,9 +170,6 @@ public class LogRoller extends HasThread { } } } - for (WAL wal : walNeedsRoll.keySet()) { - wal.logRollerExited(); - } LOG.info("LogRoller exiting."); } @@ -208,4 +208,10 @@ public class LogRoller extends HasThread { } return true; } + + @Override + public void close() { + running = false; + interrupt(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index d842f1b..78a3e8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -25,7 +25,6 @@ import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequencer; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.SingleThreadEventExecutor; import java.io.IOException; @@ -40,7 +39,6 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -143,10 +141,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; - public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = - "hbase.wal.async.logroller.exited.check.interval.ms"; - public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000; - private final EventLoop eventLoop; private final Lock consumeLock = new ReentrantLock(); @@ -176,8 +170,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { private final int createMaxRetries; - private final long logRollerExitedCheckIntervalMs; - private final ExecutorService closeExecutor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); @@ -196,85 +188,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { // file length when we issue last sync request on the writer private long fileLengthAtLastSync; - private volatile boolean logRollerExited; - - private final class LogRollerExitedChecker implements Runnable { - - private boolean cancelled; - - private ScheduledFuture<?> future; - - public synchronized void setFuture(ScheduledFuture<?> future) { - this.future = future; - } - - // See the comments in syncFailed why we need to do this. - private void cleanup() { - unackedAppends.clear(); - toWriteAppends.forEach(entry -> { - try { - entry.stampRegionSequenceId(); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } - }); - toWriteAppends.clear(); - IOException error = new IOException("sync failed but log roller exited"); - for (SyncFuture sync; (sync = syncFutures.peek()) != null;) { - sync.done(sync.getTxid(), error); - syncFutures.remove(); - } - long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; - for (long cursorBound = - waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { - if (!waitingConsumePayloads.isPublished(nextCursor)) { - break; - } - RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); - switch (truck.type()) { - case APPEND: - try { - truck.unloadAppend().stampRegionSequenceId(); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } - break; - case SYNC: - SyncFuture sync = truck.unloadSync(); - sync.done(sync.getTxid(), error); - break; - default: - LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); - break; - } - waitingConsumePayloadsGatingSequence.set(nextCursor); - } - } - - @Override - public void run() { - if (!logRollerExited) { - return; - } - // rollWriter is called in the log roller thread, and logRollerExited will be set just before - // the log rolled exit. So here we can confirm that no one could cancel us if the 'canceled' - // check passed. So it is safe to release the lock after checking 'canceled' flag. - synchronized (this) { - if (cancelled) { - return; - } - } - cleanup(); - } - - public synchronized void cancel() { - future.cancel(false); - cancelled = true; - } - } - - private LogRollerExitedChecker logRollerExitedChecker; - public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop) @@ -312,8 +225,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); - logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, - DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); rollWriter(); } @@ -357,14 +268,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { if (writerBroken) { return; } - // schedule a periodical task to check if log roller is exited. Otherwise the the sync - // request maybe blocked forever since we are still waiting for a new writer to write the - // pending data and sync it... - logRollerExitedChecker = new LogRollerExitedChecker(); - // we are currently in the EventLoop thread, so it is safe to set the future after - // schedule it since the task can not be executed before we release the thread. - logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker, - logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS)); writerBroken = true; if (waitingRoll) { readyForRolling = true; @@ -708,11 +611,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } @Override - public void logRollerExited() { - logRollerExited = true; - } - - @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { boolean overwrite = false; for (int retry = 0;; retry++) { @@ -779,10 +677,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { try { consumerScheduled.set(true); writerBroken = waitingRoll = false; - if (logRollerExitedChecker != null) { - logRollerExitedChecker.cancel(); - logRollerExitedChecker = null; - } eventLoop.execute(consumer); } finally { consumeLock.unlock(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 426e3b1..b4f0a29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -795,10 +795,6 @@ public class FSHLog extends AbstractFSWAL<Writer> { } } - @Override - public void logRollerExited() { - } - @VisibleForTesting boolean isLowReplicationRollEnabled() { return lowReplicationRollEnabled; http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 0725c4e..7f10d7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -228,10 +228,6 @@ class DisabledWALProvider implements WALProvider { public String toString() { return "WAL disabled."; } - - @Override - public void logRollerExited() { - } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 13ab85e..a9c9fe7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -212,13 +212,6 @@ public interface WAL extends Closeable { String toString(); /** - * In some WAL implementation, we will write WAL entries to new file if sync failed, which means, - * the fail recovery is depended on log roller. So here we tell the WAL that log roller has - * already been exited so the WAL cloud give up recovery. - */ - void logRollerExited(); - - /** * When outside clients need to consume persisted WALs, they rely on a provided * Reader. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index e9ff8ec..ad88cfe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -245,7 +245,7 @@ public class TestFailedAppendAndSync { } finally { // To stop logRoller, its server has to say it is stopped. Mockito.when(server.isStopped()).thenReturn(true); - if (logRoller != null) logRoller.interrupt(); + if (logRoller != null) logRoller.close(); if (region != null) { try { region.close(true); http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 63fbb69..31f9a42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -285,7 +285,7 @@ public class TestWALLockup { } finally { // To stop logRoller, its server has to say it is stopped. Mockito.when(server.isStopped()).thenReturn(true); - if (logRoller != null) logRoller.interrupt(); + if (logRoller != null) logRoller.close(); try { if (region != null) region.close(); if (dodgyWAL != null) dodgyWAL.close(); @@ -469,7 +469,7 @@ public class TestWALLockup { assertTrue(server.isAborted()); } finally { if (logRoller != null) { - logRoller.interrupt(); + logRoller.close(); } try { if (region != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a288e5/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index c7ea5f2..1b513b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -382,7 +382,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } if (null != roller) { LOG.info("shutting down log roller."); - Threads.shutdown(roller.getThread()); + roller.close(); } wals.shutdown(); // Remove the root dir for this test region