http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index dfbdae5..67c2b93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,62 +17,44 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; - import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.OutputStream; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; -import java.lang.reflect.InvocationTargetException; -import java.net.URLEncoder; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.DrainBarrier; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; @@ -81,95 +63,59 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.util.StringUtils; import org.apache.htrace.NullScope; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.TimeoutException; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - /** - * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. - * Only one WAL is ever being written at a time. When a WAL hits a configured maximum size, - * it is rolled. This is done internal to the implementation. - * - * <p>As data is flushed from the MemStore to other on-disk structures (files sorted by - * key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given - * HRegion-sequence id. A bunch of work in the below is done keeping account of these region - * sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only. - * - * <p>It is only practical to delete entire files. Thus, we delete an entire on-disk file - * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older - * (smaller) than the most-recent flush. - * - * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, - * org.apache.hadoop.fs.Path)}. - * - * <h2>Failure Semantic</h2> - * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck; - * any more appends or syncs will fail also with the same original exception. If we have made - * successful appends to the WAL and we then are unable to sync them, our current semantic is to - * return error to the client that the appends failed but also to abort the current context, - * usually the hosting server. We need to replay the WALs. TODO: Change this semantic. A roll of - * WAL may be sufficient as long as we have flagged client that the append failed. TODO: - * replication may pick up these last edits though they have been marked as failed append (Need to - * keep our own file lengths, not rely on HDFS). + * The default implementation of FSWAL. */ @InterfaceAudience.Private -public class FSHLog implements WAL { +public class FSHLog extends AbstractFSWAL<Writer> { // IMPLEMENTATION NOTES: // - // At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to + // At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to // minimize synchronizations and volatile writes when multiple contending threads as is the case - // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple + // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append - // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer. + // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer. // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so - // it does not return until the sync completes. The future is passed over the ring buffer from + // it does not return until the sync completes. The future is passed over the ring buffer from // the producer/handler to the consumer thread where it does its best to batch up the producer - // syncs so one WAL sync actually spans multiple producer sync invocations. How well the + // syncs so one WAL sync actually spans multiple producer sync invocations. How well the // batching works depends on the write rate; i.e. we tend to batch more in times of // high writes/syncs. // // Calls to append now also wait until the append has been done on the consumer side of the - // disruptor. We used to not wait but it makes the implemenation easier to grok if we have + // disruptor. We used to not wait but it makes the implementation easier to grok if we have // the region edit/sequence id after the append returns. // - // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend - // once only? Probably hard given syncs take way longer than an append. + // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend + // once only? Probably hard given syncs take way longer than an append. // // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the - // WAL). The consumer thread passes the futures to the sync threads for it to complete + // WAL). The consumer thread passes the futures to the sync threads for it to complete // the futures when done. // - // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It - // acts as a sort-of transaction id. It is always incrementing. + // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It + // acts as a sort-of transaction id. It is always incrementing. // - // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that - // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a - // synchronization class used to halt the consumer at a safe point -- just after all outstanding + // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that + // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a + // synchronization class used to halt the consumer at a safe point -- just after all outstanding // syncs and appends have completed -- so the log roller can swap the WAL out under it. - + // + // We use ring buffer sequence as txid of FSWALEntry and SyncFuture. private static final Log LOG = LogFactory.getLog(FSHLog.class); - private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms - /** - * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. - * Appends and syncs are each put on the ring which means handlers need to - * smash up against the ring twice (can we make it once only? ... maybe not since time to append - * is so different from time to sync and sometimes we don't want to sync or we want to async - * the sync). The ring is where we make sure of our ordering and it is also where we do - * batching up of handler sync calls. + * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends + * and syncs are each put on the ring which means handlers need to smash up against the ring twice + * (can we make it once only? ... maybe not since time to append is so different from time to sync + * and sometimes we don't want to sync or we want to async the sync). The ring is where we make + * sure of our ordering and it is also where we do batching up of handler sync calls. */ private final Disruptor<RingBufferTruck> disruptor; @@ -180,95 +126,13 @@ public class FSHLog implements WAL { /** * This fellow is run by the above appendExecutor service but it is all about batching up appends - * and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard - * against this, keep a reference to this handler and do explicit close on way out to make sure - * all flushed out before we exit. + * and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard against + * this, keep a reference to this handler and do explicit close on way out to make sure all + * flushed out before we exit. */ private final RingBufferEventHandler ringBufferEventHandler; /** - * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. - * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. - * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them - * get them from this Map? - */ - private final Map<Thread, SyncFuture> syncFuturesByHandler; - - /** - * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the - * ring buffer sequence. Maintained by the ring buffer consumer. - */ - private volatile long highestUnsyncedSequence = -1; - - /** - * Updated to the ring buffer sequence of the last successful sync call. This can be less than - * {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet - * come in for it. Maintained by the syncing threads. - */ - private final AtomicLong highestSyncedSequence = new AtomicLong(0); - - /** - * file system instance - */ - protected final FileSystem fs; - - /** - * WAL directory, where all WAL files would be placed. - */ - private final Path fullPathLogDir; - - /** - * dir path where old logs are kept. - */ - private final Path fullPathArchiveDir; - - /** - * Matches just those wal files that belong to this wal instance. - */ - private final PathFilter ourFiles; - - /** - * Prefix of a WAL file, usually the region server name it is hosted on. - */ - private final String logFilePrefix; - - /** - * Suffix included on generated wal file names - */ - private final String logFileSuffix; - - /** - * Prefix used when checking for wal membership. - */ - private final String prefixPathStr; - - private final WALCoprocessorHost coprocessorHost; - - /** - * conf object - */ - protected final Configuration conf; - - /** Listeners that are called on WAL events. */ - private final List<WALActionsListener> listeners = - new CopyOnWriteArrayList<WALActionsListener>(); - - @Override - public void registerWALActionsListener(final WALActionsListener listener) { - this.listeners.add(listener); - } - - @Override - public boolean unregisterWALActionsListener(final WALActionsListener listener) { - return this.listeners.remove(listener); - } - - @Override - public WALCoprocessorHost getCoprocessorHost() { - return coprocessorHost; - } - - /** * FSDataOutputStream associated with the current SequenceFile.writer */ private FSDataOutputStream hdfs_out; @@ -278,8 +142,6 @@ public class FSHLog implements WAL { // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered private final int minTolerableReplication; - private final int slowSyncNs; - // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -293,84 +155,14 @@ public class FSHLog implements WAL { // Enable it if the replications recover. private volatile boolean lowReplicationRollEnabled = true; - /** - * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding - * sequence id as yet not flushed as well as the most recent edit sequence id appended to the - * WAL. Has facility for answering questions such as "Is it safe to GC a WAL?". - */ - private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); - - /** - * Current log file. - */ - volatile Writer writer; - - /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ - private final DrainBarrier closeBarrier = new DrainBarrier(); - - /** - * This lock makes sure only one log roll runs at a time. Should not be taken while any other - * lock is held. We don't just use synchronized because that results in bogus and tedious - * findbugs warning when it thinks synchronized controls writer thread safety. It is held when - * we are actually rolling the log. It is checked when we are looking to see if we should roll - * the log or not. - */ - private final ReentrantLock rollWriterLock = new ReentrantLock(true); - - private volatile boolean closed = false; - private final AtomicBoolean shutdown = new AtomicBoolean(false); - - // The timestamp (in ms) when the log file was created. - private final AtomicLong filenum = new AtomicLong(-1); - - // Number of transactions in the current Wal. - private final AtomicInteger numEntries = new AtomicInteger(0); - - // If > than this size, roll the log. - private final long logrollsize; - - /** - * The total size of wal - */ - private AtomicLong totalLogSize = new AtomicLong(0); - - /* - * If more than this many logs, force flush of oldest region to oldest edit - * goes to disk. If too many and we crash, then will take forever replaying. - * Keep the number of logs tidy. - */ - private final int maxLogs; - /** Number of log close errors tolerated before we abort */ private final int closeErrorsTolerated; private final AtomicInteger closeErrorCount = new AtomicInteger(); - /** - * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. - * Throws an IllegalArgumentException if used to compare paths from different wals. - */ - final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() { - @Override - public int compare(Path o1, Path o2) { - long t1 = getFileNumFromFileName(o1); - long t2 = getFileNumFromFileName(o2); - if (t1 == t2) return 0; - return (t1 > t2) ? 1 : -1; - } - }; - - /** - * Map of WAL log file to the latest sequence ids of all regions it has entries of. - * The map is sorted by the log file creation timestamp (contained in the log file name). - */ - private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds = - new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR); - - /** - * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it - * logs using our logger instead of java native logger. + * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs + * using our logger instead of java native logger. */ static class RingBufferExceptionHandler implements ExceptionHandler { @Override @@ -394,12 +186,10 @@ public class FSHLog implements WAL { /** * Constructor. - * * @param fs filesystem handle * @param root path for stored and archived wals * @param logDir dir where wals are stored * @param conf configuration to use - * @throws IOException */ public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) throws IOException { @@ -407,252 +197,79 @@ public class FSHLog implements WAL { } /** - * Create an edit log at the given <code>dir</code> location. - * - * You should never have to load an existing log. If there is a log at - * startup, it should have already been processed and deleted by the time the - * WAL object is started up. - * + * Create an edit log at the given <code>dir</code> location. You should never have to load an + * existing log. If there is a log at startup, it should have already been processed and deleted + * by the time the WAL object is started up. * @param fs filesystem handle * @param rootDir path to where logs and oldlogs * @param logDir dir where wals are stored * @param archiveDir dir where wals are archived * @param conf configuration to use - * @param listeners Listeners on WAL events. Listeners passed here will - * be registered before we do anything else; e.g. the - * Constructor {@link #rollWriter()}. - * @param failIfWALExists If true IOException will be thrown if files related to this wal - * already exist. - * @param prefix should always be hostname and port in distributed env and - * it will be URL encoded before being used. - * If prefix is null, "wal" will be used + * @param listeners Listeners on WAL events. Listeners passed here will be registered before we do + * anything else; e.g. the Constructor {@link #rollWriter()}. + * @param failIfWALExists If true IOException will be thrown if files related to this wal already + * exist. + * @param prefix should always be hostname and port in distributed env and it will be URL encoded + * before being used. If prefix is null, "wal" will be used * @param suffix will be url encoded. null is treated as empty. non-empty must start with - * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER} - * @throws IOException + * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER} */ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, - final String archiveDir, final Configuration conf, - final List<WALActionsListener> listeners, - final boolean failIfWALExists, final String prefix, final String suffix) - throws IOException { - this.fs = fs; - this.fullPathLogDir = new Path(rootDir, logDir); - this.fullPathArchiveDir = new Path(rootDir, archiveDir); - this.conf = conf; - - if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) { - throw new IOException("Unable to mkdir " + fullPathLogDir); - } - - if (!fs.exists(this.fullPathArchiveDir)) { - if (!fs.mkdirs(this.fullPathArchiveDir)) { - throw new IOException("Unable to mkdir " + this.fullPathArchiveDir); - } - } - - // If prefix is null||empty then just name it wal - this.logFilePrefix = - prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); - // we only correctly differentiate suffices when numeric ones start with '.' - if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { - throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + - "' but instead was '" + suffix + "'"); - } - // Now that it exists, set the storage policy for the entire directory of wal files related to - // this FSHLog instance - FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY, - HConstants.DEFAULT_WAL_STORAGE_POLICY); - this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); - this.prefixPathStr = new Path(fullPathLogDir, - logFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); - - this.ourFiles = new PathFilter() { - @Override - public boolean accept(final Path fileName) { - // The path should start with dir/<prefix> and end with our suffix - final String fileNameString = fileName.toString(); - if (!fileNameString.startsWith(prefixPathStr)) { - return false; - } - if (logFileSuffix.isEmpty()) { - // in the case of the null suffix, we need to ensure the filename ends with a timestamp. - return org.apache.commons.lang.StringUtils.isNumeric( - fileNameString.substring(prefixPathStr.length())); - } else if (!fileNameString.endsWith(logFileSuffix)) { - return false; - } - return true; - } - }; - - if (failIfWALExists) { - final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles); - if (null != walFiles && 0 != walFiles.length) { - throw new IOException("Target WAL already exists within directory " + fullPathLogDir); - } - } - - // Register listeners. TODO: Should this exist anymore? We have CPs? - if (listeners != null) { - for (WALActionsListener i: listeners) { - registerWALActionsListener(i); - } - } - this.coprocessorHost = new WALCoprocessorHost(this, conf); - - // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks - // (it costs a little x'ing bocks) - final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); - this.logrollsize = - (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - - float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, - conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, - HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); - boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; - if(maxLogsDefined){ - LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); - } - this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", - Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, + final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", - FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); - this.lowReplicationRollLimit = - conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); + FSUtils.getDefaultReplication(fs, this.walDir)); + this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", + 5); this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0); - int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); - - LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + - ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + - ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" + - this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir); // rollWriter sets this.hdfs_out if it can. rollWriter(); - this.slowSyncNs = - 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", - DEFAULT_SLOW_SYNC_TIME_MS); - - // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is + // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); - this.appendExecutor = Executors. - newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); - // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will + this.appendExecutor = Executors + .newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); + // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will // be stuck and make no progress if the buffer is filled with appends only and there is no // sync. If no sync, then the handlers will be outstanding just waiting on sync completion // before they return. - final int preallocatedEventCount = - this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); - // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense + final int preallocatedEventCount = this.conf + .getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // spinning as other strategies do. - this.disruptor = - new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, - this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.disruptor = new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, + preallocatedEventCount, this.appendExecutor, ProducerType.MULTI, + new BlockingWaitStrategy()); // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); - this.ringBufferEventHandler = - new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5), - maxHandlersCount); + int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); + this.ringBufferEventHandler = new RingBufferEventHandler( + conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); - this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler}); - // Presize our map of SyncFutures by handler objects. - this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount); + this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); // Starting up threads in constructor is a no no; Interface should have an init call. this.disruptor.start(); } - private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) { - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); - return maxLogs; - } - - /** - * Get the backing files associated with this WAL. - * @return may be null if there are no files. - */ - protected FileStatus[] getFiles() throws IOException { - return FSUtils.listStatus(fs, fullPathLogDir, ourFiles); - } - /** - * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate - * the default behavior (such as setting the maxRecoveryErrorCount value for example (see + * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the + * default behavior (such as setting the maxRecoveryErrorCount value for example (see * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the - * underlying HDFS OutputStream. - * NOTE: This could be removed once Hadoop1 support is removed. + * underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is removed. * @return null if underlying stream is not ready. */ @VisibleForTesting OutputStream getOutputStream() { FSDataOutputStream fsdos = this.hdfs_out; - if (fsdos == null) return null; - return fsdos.getWrappedStream(); - } - - @Override - public byte [][] rollWriter() throws FailedLogCloseException, IOException { - return rollWriter(false); - } - - /** - * retrieve the next path to use for writing. - * Increments the internal filenum. - */ - private Path getNewPath() throws IOException { - this.filenum.set(System.currentTimeMillis()); - Path newPath = getCurrentFileName(); - while (fs.exists(newPath)) { - this.filenum.incrementAndGet(); - newPath = getCurrentFileName(); - } - return newPath; - } - - Path getOldPath() { - long currentFilenum = this.filenum.get(); - Path oldPath = null; - if (currentFilenum > 0) { - // ComputeFilename will take care of meta wal filename - oldPath = computeFilename(currentFilenum); - } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? - return oldPath; - } - - /** - * Tell listeners about pre log roll. - * @throws IOException - */ - private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) - throws IOException { - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogRoll(oldPath, newPath); - } - } - } - - /** - * Tell listeners about post log roll. - * @throws IOException - */ - private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) - throws IOException { - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogRoll(oldPath, newPath); - } - } + return fsdos != null ? fsdos.getWrappedStream() : null; } /** * Run a sync after opening to set up the pipeline. - * @param nextWriter - * @param startTimeNanos */ private void preemptiveSync(final ProtobufLogWriter nextWriter) { long startTimeNanos = System.nanoTime(); @@ -665,115 +282,17 @@ public class FSHLog implements WAL { } } - @Override - public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - rollWriterLock.lock(); - try { - // Return if nothing to flush. - if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null; - byte [][] regionsToFlush = null; - if (this.closed) { - LOG.debug("WAL closed. Skipping rolling of writer"); - return regionsToFlush; - } - if (!closeBarrier.beginOp()) { - LOG.debug("WAL closing. Skipping rolling of writer"); - return regionsToFlush; - } - TraceScope scope = Trace.startSpan("FSHLog.rollWriter"); - try { - Path oldPath = getOldPath(); - Path newPath = getNewPath(); - // Any exception from here on is catastrophic, non-recoverable so we currently abort. - Writer nextWriter = this.createWriterInstance(newPath); - FSDataOutputStream nextHdfsOut = null; - if (nextWriter instanceof ProtobufLogWriter) { - nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); - // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. - // If this fails, we just keep going.... it is an optimization, not the end of the world. - preemptiveSync((ProtobufLogWriter)nextWriter); - } - tellListenersAboutPreLogRoll(oldPath, newPath); - // NewPath could be equal to oldPath if replaceWriter fails. - newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); - tellListenersAboutPostLogRoll(oldPath, newPath); - // Can we delete any of the old log files? - if (getNumRolledLogFiles() > 0) { - cleanOldLogs(); - regionsToFlush = findRegionsToForceFlush(); - } - } finally { - closeBarrier.endOp(); - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); - } - return regionsToFlush; - } finally { - rollWriterLock.unlock(); - } - } - /** - * This method allows subclasses to inject different writers without having to - * extend other methods like rollWriter(). - * + * This method allows subclasses to inject different writers without having to extend other + * methods like rollWriter(). * @return Writer instance */ protected Writer createWriterInstance(final Path path) throws IOException { - return DefaultWALProvider.createWriter(conf, fs, path, false); - } - - /** - * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. - * @throws IOException - */ - private void cleanOldLogs() throws IOException { - List<Path> logsToArchive = null; - // For each log file, look at its Map of regions to highest sequence id; if all sequence ids - // are older than what is currently in memory, the WAL can be GC'd. - for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) { - Path log = e.getKey(); - Map<byte[], Long> sequenceNums = e.getValue(); - if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { - if (logsToArchive == null) logsToArchive = new ArrayList<Path>(); - logsToArchive.add(log); - if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log); - } - } - if (logsToArchive != null) { - for (Path p : logsToArchive) { - this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); - archiveLogFile(p); - this.byWalRegionSequenceIds.remove(p); - } + Writer writer = DefaultWALProvider.createWriter(conf, fs, path, false); + if (writer instanceof ProtobufLogWriter) { + preemptiveSync((ProtobufLogWriter) writer); } - } - - /** - * If the number of un-archived WAL files is greater than maximum allowed, check the first - * (oldest) WAL file, and returns those regions which should be flushed so that it can - * be archived. - * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. - * @throws IOException - */ - byte[][] findRegionsToForceFlush() throws IOException { - byte [][] regions = null; - int logCount = getNumRolledLogFiles(); - if (logCount > this.maxLogs && logCount > 0) { - Map.Entry<Path, Map<byte[], Long>> firstWALEntry = - this.byWalRegionSequenceIds.firstEntry(); - regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue()); - } - if (regions != null) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < regions.length; i++) { - if (i > 0) sb.append(", "); - sb.append(Bytes.toStringBinary(regions[i])); - } - LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + - "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); - } - return regions; + return writer; } /** @@ -781,51 +300,39 @@ public class FSHLog implements WAL { * @see #beforeWaitOnSafePoint() */ @VisibleForTesting - protected void afterCreatingZigZagLatch() {} + protected void afterCreatingZigZagLatch() { + } /** * @see #afterCreatingZigZagLatch() */ @VisibleForTesting - protected void beforeWaitOnSafePoint() {}; + protected void beforeWaitOnSafePoint() { + }; - /** - * Cleans up current writer closing it and then puts in place the passed in - * <code>nextWriter</code>. - * - * In the case of creating a new WAL, oldPath will be null. - * - * In the case of rolling over from one file to the next, none of the params will be null. - * - * In the case of closing out this FSHLog with no further use newPath, nextWriter, and - * nextHdfsOut will be null. - * - * @param oldPath may be null - * @param newPath may be null - * @param nextWriter may be null - * @param nextHdfsOut may be null - * @return the passed in <code>newPath</code> - * @throws IOException if there is a problem flushing or closing the underlying FS - */ - Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter, - final FSDataOutputStream nextHdfsOut) - throws IOException { - // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer + @Override + protected void doAppend(Writer writer, FSWALEntry entry) throws IOException { + writer.append(entry); + } + + @Override + protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException { + // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer // thread will eventually pause. An error hereafter needs to release the writer thread - // regardless -- hence the finally block below. Note, this method is called from the FSHLog + // regardless -- hence the finally block below. Note, this method is called from the FSHLog // constructor BEFORE the ring buffer is set running so it is null on first time through // here; allow for that. SyncFuture syncFuture = null; - SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)? - null: this.ringBufferEventHandler.attainSafePoint(); + SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null) ? null + : this.ringBufferEventHandler.attainSafePoint(); afterCreatingZigZagLatch(); - TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); + long oldFileLen = 0L; try { - // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the + // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the // ring buffer between the above notification of writer that we want it to go to - // 'safe point' and then here where we are waiting on it to attain safe point. Use + // 'safe point' and then here where we are waiting on it to attain safe point. Use // 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it - // to come back. Cleanup this syncFuture down below after we are ready to run again. + // to come back. Cleanup this syncFuture down below after we are ready to run again. try { if (zigzagLatch != null) { Trace.addTimelineAnnotation("awaiting safepoint"); @@ -833,44 +340,37 @@ public class FSHLog implements WAL { } } catch (FailedSyncBeforeLogCloseException e) { // If unflushed/unsynced entries on close, it is reason to abort. - if (isUnflushedEntries()) throw e; - LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " + - e.getMessage()); + if (isUnflushedEntries()) { + throw e; + } + LOG.warn( + "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage()); } - - // It is at the safe point. Swap out writer from under the blocked writer thread. - // TODO: This is close is inline with critical section. Should happen in background? - try { - if (this.writer != null) { + // It is at the safe point. Swap out writer from under the blocked writer thread. + // TODO: This is close is inline with critical section. Should happen in background? + if (this.writer != null) { + oldFileLen = this.writer.getLength(); + try { Trace.addTimelineAnnotation("closing writer"); this.writer.close(); Trace.addTimelineAnnotation("writer closed"); - } - this.closeErrorCount.set(0); - } catch (IOException ioe) { - int errors = closeErrorCount.incrementAndGet(); - if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) { - LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + - ioe.getMessage() + "\", errors=" + errors + - "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK"); - } else { - throw ioe; + this.closeErrorCount.set(0); + } catch (IOException ioe) { + int errors = closeErrorCount.incrementAndGet(); + if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) { + LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage() + + "\", errors=" + errors + + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK"); + } else { + throw ioe; + } } } this.writer = nextWriter; - this.hdfs_out = nextHdfsOut; - int oldNumEntries = this.numEntries.get(); - this.numEntries.set(0); - final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); - if (oldPath != null) { - this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest()); - long oldFileLen = this.fs.getFileStatus(oldPath).getLen(); - this.totalLogSize.addAndGet(oldFileLen); - LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + - ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + - newPathString); + if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { + this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); } else { - LOG.info("New WAL " + newPathString); + this.hdfs_out = null; } } catch (InterruptedException ie) { // Perpetuate the interrupt @@ -880,223 +380,84 @@ public class FSHLog implements WAL { LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e); throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e); } finally { - try { - // Let the writer thread go regardless, whether error or not. - if (zigzagLatch != null) { - zigzagLatch.releaseSafePoint(); - // syncFuture will be null if we failed our wait on safe point above. Otherwise, if - // latch was obtained successfully, the sync we threw in either trigger the latch or it - // got stamped with an exception because the WAL was damaged and we could not sync. Now - // the write pipeline has been opened up again by releasing the safe point, process the - // syncFuture we got above. This is probably a noop but it may be stale exception from - // when old WAL was in place. Catch it if so. - if (syncFuture != null) { - try { - blockOnSync(syncFuture); - } catch (IOException ioe) { - if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe); + // Let the writer thread go regardless, whether error or not. + if (zigzagLatch != null) { + zigzagLatch.releaseSafePoint(); + // syncFuture will be null if we failed our wait on safe point above. Otherwise, if + // latch was obtained successfully, the sync we threw in either trigger the latch or it + // got stamped with an exception because the WAL was damaged and we could not sync. Now + // the write pipeline has been opened up again by releasing the safe point, process the + // syncFuture we got above. This is probably a noop but it may be stale exception from + // when old WAL was in place. Catch it if so. + if (syncFuture != null) { + try { + blockOnSync(syncFuture); + } catch (IOException ioe) { + if (LOG.isTraceEnabled()) { + LOG.trace("Stale sync exception", ioe); } } } - } finally { - scope.close(); } } - return newPath; - } - - long getUnflushedEntriesCount() { - long highestSynced = this.highestSyncedSequence.get(); - return highestSynced > this.highestUnsyncedSequence? - 0: this.highestUnsyncedSequence - highestSynced; - } - - boolean isUnflushedEntries() { - return getUnflushedEntriesCount() > 0; + return oldFileLen; } - /* - * only public so WALSplitter can use. - * @return archived location of a WAL file with the given path p - */ - public static Path getWALArchivePath(Path archiveDir, Path p) { - return new Path(archiveDir, p.getName()); - } - - private void archiveLogFile(final Path p) throws IOException { - Path newPath = getWALArchivePath(this.fullPathArchiveDir, p); - // Tell our listeners that a log is going to be archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(p, newPath); - } - } - LOG.info("Archiving " + p + " to " + newPath); - if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { - throw new IOException("Unable to rename " + p + " to " + newPath); - } - // Tell our listeners that a log has been archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(p, newPath); + @Override + protected void doShutdown() throws IOException { + // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we + // have stopped incoming appends before calling this else it will not shutdown. We are + // conservative below waiting a long time and if not elapsed, then halting. + if (this.disruptor != null) { + long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); + try { + this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); + this.disruptor.halt(); + this.disruptor.shutdown(); } } - } - - /** - * This is a convenience method that computes a new filename with a given - * file-number. - * @param filenum to use - * @return Path - */ - protected Path computeFilename(final long filenum) { - if (filenum < 0) { - throw new RuntimeException("WAL file number can't be < 0"); + // With disruptor down, this is safe to let go. + if (this.appendExecutor != null) { + this.appendExecutor.shutdown(); } - String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix; - return new Path(fullPathLogDir, child); - } - - /** - * This is a convenience method that computes a new filename with a given - * using the current WAL file-number - * @return Path - */ - public Path getCurrentFileName() { - return computeFilename(this.filenum.get()); - } - - @Override - public String toString() { - return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")"; - } -/** - * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. - * This helper method returns the creation timestamp from a given log file. - * It extracts the timestamp assuming the filename is created with the - * {@link #computeFilename(long filenum)} method. - * @param fileName - * @return timestamp, as in the log file name. - */ - protected long getFileNumFromFileName(Path fileName) { - if (fileName == null) throw new IllegalArgumentException("file name can't be null"); - if (!ourFiles.accept(fileName)) { - throw new IllegalArgumentException("The log file " + fileName + - " doesn't belong to this WAL. (" + toString() + ")"); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir)); } - final String fileNameString = fileName.toString(); - String chompedPath = fileNameString.substring(prefixPathStr.length(), - (fileNameString.length() - logFileSuffix.length())); - return Long.parseLong(chompedPath); - } - - @Override - public void close() throws IOException { - shutdown(); - final FileStatus[] files = getFiles(); - if (null != files && 0 != files.length) { - for (FileStatus file : files) { - Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath()); - // Tell our listeners that a log is going to be archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); - } - } - - if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { - throw new IOException("Unable to rename " + file.getPath() + " to " + p); - } - // Tell our listeners that a log was archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); - } - } - } - LOG.debug("Moved " + files.length + " WAL file(s) to " + - FSUtils.getPath(this.fullPathArchiveDir)); + if (this.writer != null) { + this.writer.close(); + this.writer = null; } - LOG.info("Closed WAL: " + toString()); } @Override - public void shutdown() throws IOException { - if (shutdown.compareAndSet(false, true)) { - try { - // Prevent all further flushing and rolling. - closeBarrier.stopAndDrainOps(); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for cache flushes and log rolls", e); - Thread.currentThread().interrupt(); - } - - // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we - // have stopped incoming appends before calling this else it will not shutdown. We are - // conservative below waiting a long time and if not elapsed, then halting. - if (this.disruptor != null) { - long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); - try { - this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + - "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); - this.disruptor.halt(); - this.disruptor.shutdown(); - } - } - // With disruptor down, this is safe to let go. - if (this.appendExecutor != null) this.appendExecutor.shutdown(); - - // Tell our listeners that the log is closing - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.logCloseRequested(); - } - } - this.closed = true; - if (LOG.isDebugEnabled()) { - LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir)); - } - if (this.writer != null) { - this.writer.close(); - this.writer = null; - } - } + public String toString() { + return "FSHLog " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; } - /** - * NOTE: This append, at a time that is usually after this call returns, starts an - * mvcc transaction by calling 'begin' wherein which we assign this update a sequenceid. At - * assignment time, we stamp all the passed in Cells inside WALEdit with their sequenceId. - * You must 'complete' the transaction this mvcc transaction by calling - * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it - * in the finally of a try/finally - * block within which this append lives and any subsequent operations like sync or - * update of memstore, etc. Get the WriteEntry to pass mvcc out of the passed in WALKey - * <code>walKey</code> parameter. Be warned that the WriteEntry is not immediately available - * on return from this method. It WILL be available subsequent to a sync of this append; - * otherwise, you will just have to wait on the WriteEntry to get filled in. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", - justification="Will never be null") + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", + justification = "Will never be null") @Override public long append(final HRegionInfo hri, final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException { - if (this.closed) throw new IOException("Cannot append; log is closed"); - // Make a trace scope for the append. It is closed on other side of the ring buffer by the - // single consuming thread. Don't have to worry about it. + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + // Make a trace scope for the append. It is closed on other side of the ring buffer by the + // single consuming thread. Don't have to worry about it. TraceScope scope = Trace.startSpan("FSHLog.append"); - // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need + // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need // all this to make a key and then below to append the edit, we need to carry htd, info, // etc. all over the ring buffer. FSWALEntry entry = null; long sequence = this.disruptor.getRingBuffer().next(); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the + // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); @@ -1108,17 +469,18 @@ public class FSHLog implements WAL { } /** - * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest - * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run - * multiple threads sync'ng rather than one that just syncs in series so we have better - * latencies; otherwise, an edit that arrived just after a sync started, might have to wait - * almost the length of two sync invocations before it is marked done. - * <p>When the sync completes, it marks all the passed in futures done. On the other end of the - * sync future is a blocked thread, usually a regionserver Handler. There may be more than one - * future passed in the case where a few threads arrive at about the same time and all invoke - * 'sync'. In this case we'll batch up the invocations and run one filesystem sync only for a - * batch of Handler sync invocations. Do not confuse these Handler SyncFutures with the futures - * an ExecutorService returns when you call submit. We have no use for these in this model. These + * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest + * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run + * multiple threads sync'ng rather than one that just syncs in series so we have better latencies; + * otherwise, an edit that arrived just after a sync started, might have to wait almost the length + * of two sync invocations before it is marked done. + * <p> + * When the sync completes, it marks all the passed in futures done. On the other end of the sync + * future is a blocked thread, usually a regionserver Handler. There may be more than one future + * passed in the case where a few threads arrive at about the same time and all invoke 'sync'. In + * this case we'll batch up the invocations and run one filesystem sync only for a batch of + * Handler sync invocations. Do not confuse these Handler SyncFutures with the futures an + * ExecutorService returns when you call submit. We have no use for these in this model. These * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync * completes. */ @@ -1130,12 +492,13 @@ public class FSHLog implements WAL { /** * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, - * we will put the result of the actual hdfs sync call as the result. - * @param sequence The sequence number on the ring buffer when this thread was set running. - * If this actual writer sync completes then all appends up this point have been - * flushed/synced/pushed to datanodes. If we fail, then the passed in <code>syncs</code> - * futures will return the exception to their clients; some of the edits may have made it out - * to data nodes but we will report all that were part of this session as failed. + * we will put the result of the actual hdfs sync call as the result. + * @param sequence The sequence number on the ring buffer when this thread was set running. If + * this actual writer sync completes then all appends up this point have been + * flushed/synced/pushed to datanodes. If we fail, then the passed in + * <code>syncs</code> futures will return the exception to their clients; some of the + * edits may have made it out to data nodes but we will report all that were part of + * this session as failed. */ SyncRunner(final String name, final int maxHandlersCount) { super(name); @@ -1145,17 +508,17 @@ public class FSHLog implements WAL { // // We could let the capacity be 'open' but bound it so we get alerted in pathological case // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs - // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should - // stay neat and tidy in usual case. Let the max size be three times the maximum handlers. + // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should + // stay neat and tidy in usual case. Let the max size be three times the maximum handlers. // The passed in maxHandlerCount is the user-level handlers which is what we put up most of // but HBase has other handlers running too -- opening region handlers which want to write - // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually + // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually // much fewer in number than the user-space handlers so Q-size should be user handlers plus - // some space for these other handlers. Lets multiply by 3 for good-measure. + // some space for these other handlers. Lets multiply by 3 for good-measure. this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3); } - void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) { + void offer(final long sequence, final SyncFuture[] syncFutures, final int syncFutureCount) { // Set sequence first because the add to the queue will wake the thread if sleeping. this.sequence = sequence; for (int i = 0; i < syncFutureCount; ++i) { @@ -1165,28 +528,28 @@ public class FSHLog implements WAL { /** * Release the passed <code>syncFuture</code> - * @param syncFuture - * @param currentSequence - * @param t * @return Returns 1. */ private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, final Throwable t) { - if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException(); + if (!syncFuture.done(currentSequence, t)) { + throw new IllegalStateException(); + } // This function releases one sync future only. return 1; } /** * Release all SyncFutures whose sequence is <= <code>currentSequence</code>. - * @param currentSequence * @param t May be non-null if we are processing SyncFutures because an exception was thrown. * @return Count of SyncFutures we let go. */ private int releaseSyncFutures(final long currentSequence, final Throwable t) { int syncCount = 0; for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { - if (syncFuture.getRingBufferSequence() > currentSequence) break; + if (syncFuture.getTxid() > currentSequence) { + break; + } releaseSyncFuture(syncFuture, currentSequence, t); if (!this.syncFutures.remove(syncFuture)) { throw new IllegalStateException(syncFuture.toString()); @@ -1204,14 +567,14 @@ public class FSHLog implements WAL { long currentHighestSyncedSequence; // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. do { - currentHighestSyncedSequence = highestSyncedSequence.get(); + currentHighestSyncedSequence = highestSyncedTxid.get(); if (currentHighestSyncedSequence >= sequence) { // Set the sync number to current highwater mark; might be able to let go more // queued sync futures sequence = currentHighestSyncedSequence; break; } - } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); + } while (!highestSyncedTxid.compareAndSet(currentHighestSyncedSequence, sequence)); return sequence; } @@ -1225,21 +588,21 @@ public class FSHLog implements WAL { // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); currentSequence = this.sequence; - long syncFutureSequence = takeSyncFuture.getRingBufferSequence(); + long syncFutureSequence = takeSyncFuture.getTxid(); if (syncFutureSequence > currentSequence) { - throw new IllegalStateException("currentSequence=" + syncFutureSequence + - ", syncFutureSequence=" + syncFutureSequence); + throw new IllegalStateException("currentSequence=" + syncFutureSequence + + ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. - long currentHighestSyncedSequence = highestSyncedSequence.get(); + long currentHighestSyncedSequence = highestSyncedTxid.get(); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); - // Done with the 'take'. Go around again and do a new 'take'. + // Done with the 'take'. Go around again and do a new 'take'. continue; } break; } - // I got something. Lets run. Save off current sequence number in case it changes + // I got something. Lets run. Save off current sequence number in case it changes // while we run. TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); @@ -1262,8 +625,11 @@ public class FSHLog implements WAL { syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, lastException); - if (lastException != null) requestLogRoll(); - else checkLogRoll(); + if (lastException != null) { + requestLogRoll(); + } else { + checkLogRoll(); + } } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { @@ -1281,7 +647,9 @@ public class FSHLog implements WAL { */ void checkLogRoll() { // Will return immediately if we are in the middle of a WAL log roll currently. - if (!rollWriterLock.tryLock()) return; + if (!rollWriterLock.tryLock()) { + return; + } boolean lowReplication; try { lowReplication = checkLowReplication(); @@ -1297,7 +665,7 @@ public class FSHLog implements WAL { } } - /* + /** * @return true if number of replicas for the WAL is lower than threshold */ private boolean checkLowReplication() { @@ -1309,11 +677,10 @@ public class FSHLog implements WAL { if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { if (this.lowReplicationRollEnabled) { if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { - LOG.warn("HDFS pipeline error detected. " + "Found " - + numCurrentReplicas + " replicas but expecting no less than " - + this.minTolerableReplication + " replicas. " - + " Requesting close of WAL. current pipeline: " - + Arrays.toString(getPipeLine())); + LOG.warn("HDFS pipeline error detected. " + "Found " + numCurrentReplicas + + " replicas but expecting no less than " + this.minTolerableReplication + + " replicas. " + " Requesting close of WAL. current pipeline: " + + Arrays.toString(getPipeline())); logRollNeeded = true; // If rollWriter is requested, increase consecutiveLogRolls. Once it // is larger than lowReplicationRollLimit, disable the @@ -1341,8 +708,7 @@ public class FSHLog implements WAL { } } } catch (Exception e) { - LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + - ", continuing..."); + LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + ", continuing..."); } return logRollNeeded; } @@ -1353,6 +719,7 @@ public class FSHLog implements WAL { private SyncFuture publishSyncOnRingBuffer(Span span) { long sequence = this.disruptor.getRingBuffer().next(); + // here we use ring buffer sequence as transaction id SyncFuture syncFuture = getSyncFuture(sequence, span); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); @@ -1368,81 +735,17 @@ public class FSHLog implements WAL { return blockOnSync(publishSyncOnRingBuffer(span)); } - private Span blockOnSync(final SyncFuture syncFuture) throws IOException { - // Now we have published the ringbuffer, halt the current thread until we get an answer back. - try { - syncFuture.get(); - return syncFuture.getSpan(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted", ie); - throw convertInterruptedExceptionToIOException(ie); - } catch (ExecutionException e) { - throw ensureIOException(e.getCause()); - } - } - - private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { - Thread.currentThread().interrupt(); - IOException ioe = new InterruptedIOException(); - ioe.initCause(ie); - return ioe; - } - - private SyncFuture getSyncFuture(final long sequence, Span span) { - SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); - if (syncFuture == null) { - syncFuture = new SyncFuture(); - this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); - } - return syncFuture.reset(sequence, span); - } - - private void postSync(final long timeInNanos, final int handlerSyncs) { - if (timeInNanos > this.slowSyncNs) { - String msg = - new StringBuilder().append("Slow sync cost: ") - .append(timeInNanos / 1000000).append(" ms, current pipeline: ") - .append(Arrays.toString(getPipeLine())).toString(); - Trace.addTimelineAnnotation(msg); - LOG.info(msg); - } - if (!listeners.isEmpty()) { - for (WALActionsListener listener : listeners) { - listener.postSync(timeInNanos, handlerSyncs); - } - } - } - - private long postAppend(final Entry e, final long elapsedTime) { - long len = 0; - if (!listeners.isEmpty()) { - for (Cell cell : e.getEdit().getCells()) { - len += CellUtil.estimatedSerializedSizeOf(cell); - } - for (WALActionsListener listener : listeners) { - listener.postAppend(len, elapsedTime); - } - } - return len; - } - - /** - * This method gets the datanode replication count for the current WAL. - * - * If the pipeline isn't started yet or is empty, you will get the default - * replication factor. Therefore, if this function returns 0, it means you - * are not properly running with the HDFS-826 patch. - * @throws InvocationTargetException - * @throws IllegalAccessException - * @throws IllegalArgumentException - * - * @throws Exception + * {@inheritDoc} + * <p> + * If the pipeline isn't started yet or is empty, you will get the default replication factor. + * Therefore, if this function returns 0, it means you are not properly running with the HDFS-826 + * patch. */ @VisibleForTesting int getLogReplication() { try { - //in standalone mode, it will return 0 + // in standalone mode, it will return 0 if (this.hdfs_out instanceof HdfsDataOutputStream) { return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication(); } @@ -1465,7 +768,7 @@ public class FSHLog implements WAL { @Override public void sync(long txid) throws IOException { - if (this.highestSyncedSequence.get() >= txid){ + if (this.highestSyncedTxid.get() >= txid) { // Already sync'd. return; } @@ -1478,70 +781,20 @@ public class FSHLog implements WAL { } } - // public only until class moves to o.a.h.h.wal - public void requestLogRoll() { - requestLogRoll(false); - } - - private void requestLogRoll(boolean tooFewReplicas) { - if (!this.listeners.isEmpty()) { - for (WALActionsListener i: this.listeners) { - i.logRollRequested(tooFewReplicas); - } - } - } - - // public only until class moves to o.a.h.h.wal - /** @return the number of rolled log files */ - public int getNumRolledLogFiles() { - return byWalRegionSequenceIds.size(); - } - - // public only until class moves to o.a.h.h.wal - /** @return the number of log files in use */ - public int getNumLogFiles() { - // +1 for current use log - return getNumRolledLogFiles() + 1; - } - - // public only until class moves to o.a.h.h.wal - /** @return the size of log files in use */ - public long getLogFileSize() { - return this.totalLogSize.get(); - } - - @Override - public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) { - if (!closeBarrier.beginOp()) { - LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; - } - return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); - } - - @Override - public void completeCacheFlush(final byte [] encodedRegionName) { - this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); - closeBarrier.endOp(); - } - @Override - public void abortCacheFlush(byte[] encodedRegionName) { - this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); - closeBarrier.endOp(); + public void logRollerExited() { } @VisibleForTesting boolean isLowReplicationRollEnabled() { - return lowReplicationRollEnabled; + return lowReplicationRollEnabled; } - public static final long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + - ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); + public static final long FIXED_OVERHEAD = ClassSize + .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER + + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); - private static void split(final Configuration conf, final Path p) - throws IOException { + private static void split(final Configuration conf, final Path p) throws IOException { FileSystem fs = FileSystem.get(conf); if (!fs.exists(p)) { throw new FileNotFoundException(p.toString()); @@ -1555,51 +808,30 @@ public class FSHLog implements WAL { WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } - - @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - // Used by tests. Deprecated as too subtle for general usage. - return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); - } - - @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { - // This method is used by tests and for figuring if we should flush or not because our - // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use - // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId - // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the - // currently flushing sequence ids, and if anything found there, it is returning these. This is - // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if - // we crash during the flush. For figuring what to flush, we might get requeued if our sequence - // id is old even though we are currently flushing. This may mean we do too much flushing. - return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); - } - /** - * This class is used coordinating two threads holding one thread at a - * 'safe point' while the orchestrating thread does some work that requires the first thread - * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another - * thread. - * - * <p>Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until - * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. - * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, - * goes ahead and does the work it needs to do while Thread B is holding. When Thread A is done, - * it flags B and then Thread A and Thread B continue along on their merry way. Pause and - * signalling 'zigzags' between the two participating threads. We use two latches -- one the - * inverse of the other -- pausing and signaling when states are achieved. - * - * <p>To start up the drama, Thread A creates an instance of this class each time it would do - * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot - * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it - * starts to work toward the 'safe point'. Thread A calls {@link #waitSafePoint()} when it - * cannot proceed until the Thread B 'safe point' is attained. Thread A will be held inside in - * {@link #waitSafePoint()} until Thread B reaches the 'safe point'. Once there, Thread B - * frees Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B - * is at the 'safe point' and that it is holding there (When Thread B calls - * {@link #safePointAttained()} it blocks here until Thread A calls {@link #releaseSafePoint()}). - * Thread A proceeds to do what it needs to do while Thread B is paused. When finished, - * it lets Thread B lose by calling {@link #releaseSafePoint()} and away go both Threads again. + * This class is used coordinating two threads holding one thread at a 'safe point' while the + * orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL + * writer while its WAL is swapped out from under it by another thread. + * <p> + * Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until Thread B + * gets there. When the 'safe point' has been attained, Thread B signals Thread A. Thread B then + * holds at the 'safe point'. Thread A on notification that Thread B is paused, goes ahead and + * does the work it needs to do while Thread B is holding. When Thread A is done, it flags B and + * then Thread A and Thread B continue along on their merry way. Pause and signalling 'zigzags' + * between the two participating threads. We use two latches -- one the inverse of the other -- + * pausing and signaling when states are achieved. + * <p> + * To start up the drama, Thread A creates an instance of this class each time it would do this + * zigzag dance and passes it to Thread B (these classes use Latches so it is one shot only). + * Thread B notices the new instance (via reading a volatile reference or how ever) and it starts + * to work toward the 'safe point'. Thread A calls {@link #waitSafePoint()} when it cannot proceed + * until the Thread B 'safe point' is attained. Thread A will be held inside in + * {@link #waitSafePoint()} until Thread B reaches the 'safe point'. Once there, Thread B frees + * Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B is at the 'safe + * point' and that it is holding there (When Thread B calls {@link #safePointAttained()} it blocks + * here until Thread A calls {@link #releaseSafePoint()}). Thread A proceeds to do what it needs + * to do while Thread B is paused. When finished, it lets Thread B lose by calling + * {@link #releaseSafePoint()} and away go both Threads again. */ static class SafePointZigZagLatch { /** @@ -1607,24 +839,23 @@ public class FSHLog implements WAL { */ private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1); /** - * Latch to wait on. Will be released when we can proceed. + * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); /** - * For Thread A to call when it is ready to wait on the 'safe point' to be attained. - * Thread A will be held in here until Thread B calls {@link #safePointAttained()} - * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with - * an exception, then something is up w/ our syncing. - * @throws InterruptedException - * @throws ExecutionException + * For Thread A to call when it is ready to wait on the 'safe point' to be attained. Thread A + * will be held in here until Thread B calls {@link #safePointAttained()} + * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with an + * exception, then something is up w/ our syncing. * @return The passed <code>syncFuture</code> - * @throws FailedSyncBeforeLogCloseException */ - SyncFuture waitSafePoint(final SyncFuture syncFuture) - throws InterruptedException, FailedSyncBeforeLogCloseException { + SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, + FailedSyncBeforeLogCloseException { while (true) { - if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break; + if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) { + break; + } if (syncFuture.isThrowable()) { throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable()); } @@ -1633,10 +864,9 @@ public class FSHLog implements WAL { } /** - * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals - * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} - * is called by Thread A. - * @throws InterruptedException + * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread + * A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called + * by Thread A. */ void safePointAttained() throws InterruptedException { this.safePointAttainedLatch.countDown(); @@ -1644,8 +874,8 @@ public class FSHLog implements WAL { } /** - * Called by Thread A when it is done with the work it needs to do while Thread B is - * halted. This will release the Thread B held in a call to {@link #safePointAttained()} + * Called by Thread A when it is done with the work it needs to do while Thread B is halted. + * This will release the Thread B held in a call to {@link #safePointAttained()} */ void releaseSafePoint() { this.safePointReleasedLatch.countDown(); @@ -1655,44 +885,44 @@ public class FSHLog implements WAL { * @return True is this is a 'cocked', fresh instance, and not one that has already fired. */ boolean isCocked() { - return this.safePointAttainedLatch.getCount() > 0 && - this.safePointReleasedLatch.getCount() > 0; + return this.safePointAttainedLatch.getCount() > 0 + && this.safePointReleasedLatch.getCount() > 0; } } /** * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE - * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up - * syncs. There is no discernible benefit batching appends so we just append as they come in - * because it simplifies the below implementation. See metrics for batching effectiveness - * (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 - * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, - * YMMV). - * <p>Herein, we have an array into which we store the sync futures as they come in. When we - * have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the - * filesystem sync. When it completes, it will then call - * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the batch to release - * blocked Handler threads. - * <p>I've tried various effects to try and make latencies low while keeping throughput high. - * I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the - * syncs coming and having sync runner threads poll off the head to 'finish' completed - * SyncFutures. I've tried linkedlist, and various from concurrent utils whether - * LinkedBlockingQueue or ArrayBlockingQueue, etc. The more points of synchronization, the - * more 'work' (according to 'perf stats') that has to be done; small increases in stall - * percentages seem to have a big impact on throughput/latencies. The below model where we have - * an array into which we stash the syncs and then hand them off to the sync thread seemed like - * a decent compromise. See HBASE-8755 for more detail. + * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up + * syncs. There is no discernible benefit batching appends so we just append as they come in + * because it simplifies the below implementation. See metrics for batching effectiveness (In + * measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 handler + * sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, YMMV). + * <p> + * Herein, we have an array into which we store the sync futures as they come in. When we have a + * 'batch', we'll then pass what we have collected to a SyncRunner thread to do the filesystem + * sync. When it completes, it will then call {@link SyncFuture#done(long, Throwable)} on each of + * SyncFutures in the batch to release blocked Handler threads. + * <p> + * I've tried various effects to try and make latencies low while keeping throughput high. I've + * tried keeping a single Queue of SyncFutures in this class appending to its tail as the syncs + * coming and having sync runner threads poll off the head to 'finish' completed SyncFutures. I've + * tried linkedlist, and various from concurrent utils whether LinkedBlockingQueue or + * ArrayBlockingQueue, etc. The more points of synchronization, the more 'work' (according to + * 'perf stats') that has to be done; small increases in stall percentages seem to have a big + * impact on throughput/latencies. The below model where we have an array into which we stash the + * syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 + * for more detail. */ class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware { - private final SyncRunner [] syncRunners; - private final SyncFuture [] syncFutures; - // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all + private final SyncRunner[] syncRunners; + private final SyncFuture[] syncFutures; + // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all // syncFutures to the next sync'ing thread. private volatile int syncFuturesCount = 0; private volatile SafePointZigZagLatch zigzagLatch; /** - * Set if we get an exception appending or syncing so that all subsequence appends and syncs - * on this WAL fail until WAL is replaced. + * Set if we get an exception appending or syncing so that all subsequence appends and syncs on + * this WAL fail until WAL is replaced. */ private Exception exception = null; /** @@ -1716,7 +946,9 @@ public class FSHLog implements WAL { private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { // There could be handler-count syncFutures outstanding. - for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); + for (int i = 0; i < this.syncFuturesCount; i++) { + this.syncFutures[i].done(sequence, e); + } this.syncFuturesCount = 0; } @@ -1725,7 +957,9 @@ public class FSHLog implements WAL { */ private boolean isOutstandingSyncs() { for (int i = 0; i < this.syncFuturesCount; i++) { - if (!this.syncFutures[i].isDone()) return true; + if (!this.syncFutures[i].isDone()) { + return true; + } } return false; } @@ -1733,10 +967,10 @@ public class FSHLog implements WAL { @Override // We can set endOfBatch in the below method if at end of our this.syncFutures array public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) - throws Exception { - // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll - // add appends to dfsclient as they come in. Batching appends doesn't give any significant
<TRUNCATED>
