http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java new file mode 100644 index 0000000..2d6b589 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -0,0 +1,2171 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagRewriteCell; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; +import org.apache.hadoop.hbase.exceptions.RegionOpeningException; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.LastSequenceId; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.io.MultipleIOException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + +// imports for things that haven't moved from regionserver.wal yet. +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; + +/** + * This class is responsible for splitting up a bunch of regionserver commit log + * files that are no longer being written to, into new files, one per region for + * region to replay on startup. Delete the old log files when finished. + */ +@InterfaceAudience.Private +public class WALSplitter { + static final Log LOG = LogFactory.getLog(WALSplitter.class); + + /** By default we retry errors in splitting, rather than skipping. */ + public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; + + // Parameters for split process + protected final Path rootDir; + protected final FileSystem fs; + protected final Configuration conf; + + // Major subcomponents of the split process. + // These are separated into inner classes to make testing easier. + OutputSink outputSink; + EntryBuffers entryBuffers; + + private Set<TableName> disablingOrDisabledTables = + new HashSet<TableName>(); + private BaseCoordinatedStateManager csm; + private final WALFactory walFactory; + + // If an exception is thrown by one of the other threads, it will be + // stored here. + protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); + + // Wait/notify for when data has been produced by the reader thread, + // consumed by the reader thread, or an exception occurred + final Object dataAvailable = new Object(); + + private MonitoredTask status; + + // For checking the latest flushed sequence id + protected final LastSequenceId sequenceIdChecker; + + protected boolean distributedLogReplay; + + // Map encodedRegionName -> lastFlushedSequenceId + protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>(); + + // Map encodedRegionName -> maxSeqIdInStores + protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = + new ConcurrentHashMap<String, Map<byte[], Long>>(); + + // Failed region server that the wal file being split belongs to + protected String failedServerName = ""; + + // Number of writer threads + private final int numWriterThreads; + + // Min batch size when replay WAL edits + private final int minBatchSize; + + WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, + FileSystem fs, LastSequenceId idChecker, + CoordinatedStateManager csm, RecoveryMode mode) { + this.conf = HBaseConfiguration.create(conf); + String codecClassName = conf + .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); + this.rootDir = rootDir; + this.fs = fs; + this.sequenceIdChecker = idChecker; + this.csm = (BaseCoordinatedStateManager)csm; + this.walFactory = factory; + + entryBuffers = new EntryBuffers( + this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", + 128*1024*1024)); + + // a larger minBatchSize may slow down recovery because replay writer has to wait for + // enough edits before replaying them + this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); + this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); + + this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + if (csm != null && this.distributedLogReplay) { + outputSink = new LogReplayOutputSink(numWriterThreads); + } else { + if (this.distributedLogReplay) { + LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); + } + this.distributedLogReplay = false; + outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); + } + + } + + /** + * Splits a WAL file into region's recovered-edits directory. + * This is the main entry point for distributed log splitting from SplitLogWorker. + * <p> + * If the log file has N regions then N recovered.edits files will be produced. + * <p> + * @param rootDir + * @param logfile + * @param fs + * @param conf + * @param reporter + * @param idChecker + * @param cp coordination state manager + * @return false if it is interrupted by the progress-able. + * @throws IOException + */ + public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, + Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, + CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException { + WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode); + return s.splitLogFile(logfile, reporter); + } + + // A wrapper to split one log folder using the method used by distributed + // log splitting. Used by tools and unit tests. It should be package private. + // It is public only because UpgradeTo96 and TestWALObserver are in different packages, + // which uses this method to do log splitting. + public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, + FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { + final FileStatus[] logfiles = SplitLogManager.getFileList(conf, + Collections.singletonList(logDir), null); + List<Path> splits = new ArrayList<Path>(); + if (logfiles != null && logfiles.length > 0) { + for (FileStatus logfile: logfiles) { + WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, + RecoveryMode.LOG_SPLITTING); + if (s.splitLogFile(logfile, null)) { + finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); + if (s.outputSink.splits != null) { + splits.addAll(s.outputSink.splits); + } + } + } + } + if (!fs.delete(logDir, true)) { + throw new IOException("Unable to delete src dir: " + logDir); + } + return splits; + } + + /** + * log splitting implementation, splits one log file. + * @param logfile should be an actual log file. + */ + boolean splitLogFile(FileStatus logfile, + CancelableProgressable reporter) throws IOException { + Preconditions.checkState(status == null); + Preconditions.checkArgument(logfile.isFile(), + "passed in file status is for something other than a regular file."); + boolean isCorrupted = false; + boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", + SPLIT_SKIP_ERRORS_DEFAULT); + int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); + Path logPath = logfile.getPath(); + boolean outputSinkStarted = false; + boolean progress_failed = false; + int editsCount = 0; + int editsSkipped = 0; + + status = + TaskMonitor.get().createStatus( + "Splitting log file " + logfile.getPath() + "into a temporary staging area."); + Reader in = null; + try { + long logLength = logfile.getLen(); + LOG.info("Splitting wal: " + logPath + ", length=" + logLength); + LOG.info("DistributedLogReplay = " + this.distributedLogReplay); + status.setStatus("Opening log file"); + if (reporter != null && !reporter.progress()) { + progress_failed = true; + return false; + } + try { + in = getReader(logfile, skipErrors, reporter); + } catch (CorruptedLogFileException e) { + LOG.warn("Could not get reader, corrupted log file " + logPath, e); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); + isCorrupted = true; + } + if (in == null) { + LOG.warn("Nothing to split in log file " + logPath); + return true; + } + if (csm != null) { + try { + TableStateManager tsm = csm.getTableStateManager(); + disablingOrDisabledTables = tsm.getTablesInStates( + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING); + } catch (CoordinatedStateException e) { + throw new IOException("Can't get disabling/disabled tables", e); + } + } + int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); + int numOpenedFilesLastCheck = 0; + outputSink.setReporter(reporter); + outputSink.startWriterThreads(); + outputSinkStarted = true; + Entry entry; + Long lastFlushedSequenceId = -1L; + ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath); + failedServerName = (serverName == null) ? "" : serverName.getServerName(); + while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { + byte[] region = entry.getKey().getEncodedRegionName(); + String key = Bytes.toString(region); + lastFlushedSequenceId = lastFlushedSequenceIds.get(key); + if (lastFlushedSequenceId == null) { + if (this.distributedLogReplay) { + RegionStoreSequenceIds ids = + csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, + key); + if (ids != null) { + lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + } + } else if (sequenceIdChecker != null) { + lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); + } + if (lastFlushedSequenceId == null) { + lastFlushedSequenceId = -1L; + } + lastFlushedSequenceIds.put(key, lastFlushedSequenceId); + } + if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { + editsSkipped++; + continue; + } + entryBuffers.appendEntry(entry); + editsCount++; + int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; + // If sufficient edits have passed, check if we should report progress. + if (editsCount % interval == 0 + || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { + numOpenedFilesLastCheck = this.getNumOpenWriters(); + String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits())) + + " edits, skipped " + editsSkipped + " edits."; + status.setStatus("Split " + countsStr); + if (reporter != null && !reporter.progress()) { + progress_failed = true; + return false; + } + } + } + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } catch (CorruptedLogFileException e) { + LOG.warn("Could not parse, corrupted log file " + logPath, e); + csm.getSplitLogWorkerCoordination().markCorrupted(rootDir, + logfile.getPath().getName(), fs); + isCorrupted = true; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + throw e; + } finally { + LOG.debug("Finishing writing output logs and closing down."); + try { + if (null != in) { + in.close(); + } + } catch (IOException exception) { + LOG.warn("Could not close wal reader: " + exception.getMessage()); + LOG.debug("exception details", exception); + } + try { + if (outputSinkStarted) { + // Set progress_failed to true as the immediate following statement will reset its value + // when finishWritingAndClose() throws exception, progress_failed has the right value + progress_failed = true; + progress_failed = outputSink.finishWritingAndClose() == null; + } + } finally { + String msg = + "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() + + " regions; log file=" + logPath + " is corrupted = " + isCorrupted + + " progress failed = " + progress_failed; + LOG.info(msg); + status.markComplete(msg); + } + } + return !progress_failed; + } + + /** + * Completes the work done by splitLogFile by archiving logs + * <p> + * It is invoked by SplitLogManager once it knows that one of the + * SplitLogWorkers have completed the splitLogFile() part. If the master + * crashes then this function might get called multiple times. + * <p> + * @param logfile + * @param conf + * @throws IOException + */ + public static void finishSplitLogFile(String logfile, + Configuration conf) throws IOException { + Path rootdir = FSUtils.getRootDir(conf); + Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + Path logPath; + if (FSUtils.isStartingWithPath(rootdir, logfile)) { + logPath = new Path(logfile); + } else { + logPath = new Path(rootdir, logfile); + } + finishSplitLogFile(rootdir, oldLogDir, logPath, conf); + } + + static void finishSplitLogFile(Path rootdir, Path oldLogDir, + Path logPath, Configuration conf) throws IOException { + List<Path> processedLogs = new ArrayList<Path>(); + List<Path> corruptedLogs = new ArrayList<Path>(); + FileSystem fs; + fs = rootdir.getFileSystem(conf); + if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { + corruptedLogs.add(logPath); + } else { + processedLogs.add(logPath); + } + archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); + fs.delete(stagingDir, true); + } + + /** + * Moves processed logs to a oldLogDir after successful processing Moves + * corrupted logs (any log that couldn't be successfully parsed to corruptDir + * (.corrupt) for later investigation + * + * @param corruptedLogs + * @param processedLogs + * @param oldLogDir + * @param fs + * @param conf + * @throws IOException + */ + private static void archiveLogs( + final List<Path> corruptedLogs, + final List<Path> processedLogs, final Path oldLogDir, + final FileSystem fs, final Configuration conf) throws IOException { + final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get( + "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME)); + + if (!fs.mkdirs(corruptDir)) { + LOG.info("Unable to mkdir " + corruptDir); + } + fs.mkdirs(oldLogDir); + + // this method can get restarted or called multiple times for archiving + // the same log files. + for (Path corrupted : corruptedLogs) { + Path p = new Path(corruptDir, corrupted.getName()); + if (fs.exists(corrupted)) { + if (!fs.rename(corrupted, p)) { + LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + } else { + LOG.warn("Moved corrupted log " + corrupted + " to " + p); + } + } + } + + for (Path p : processedLogs) { + Path newPath = FSHLog.getWALArchivePath(oldLogDir, p); + if (fs.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.info("Archived processed log " + p + " to " + newPath); + } + } + } + } + + /** + * Path to a file under RECOVERED_EDITS_DIR directory of the region found in + * <code>logEntry</code> named for the sequenceid in the passed + * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332. + * This method also ensures existence of RECOVERED_EDITS_DIR under the region + * creating it if necessary. + * @param fs + * @param logEntry + * @param rootDir HBase root dir. + * @return Path to file into which to dump split log edits. + * @throws IOException + */ + @SuppressWarnings("deprecation") + static Path getRegionSplitEditsPath(final FileSystem fs, + final Entry logEntry, final Path rootDir, boolean isCreate) + throws IOException { + Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename()); + String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); + Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path dir = getRegionDirRecoveredEditsDir(regiondir); + + if (!fs.exists(regiondir)) { + LOG.info("This region's directory doesn't exist: " + + regiondir.toString() + ". It is very likely that it was" + + " already split so it's safe to discard those edits."); + return null; + } + if (fs.exists(dir) && fs.isFile(dir)) { + Path tmp = new Path("/tmp"); + if (!fs.exists(tmp)) { + fs.mkdirs(tmp); + } + tmp = new Path(tmp, + HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); + LOG.warn("Found existing old file: " + dir + ". It could be some " + + "leftover of an old installation. It should be a folder instead. " + + "So moving it to " + tmp); + if (!fs.rename(dir, tmp)) { + LOG.warn("Failed to sideline old file " + dir); + } + } + + if (isCreate && !fs.exists(dir)) { + if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + } + // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure + // region's replayRecoveredEdits will not delete it + String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()); + fileName = getTmpRecoveredEditsFileName(fileName); + return new Path(dir, fileName); + } + + static String getTmpRecoveredEditsFileName(String fileName) { + return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; + } + + /** + * Get the completed recovered edits file path, renaming it to be by last edit + * in the file from its first edit. Then we could use the name to skip + * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}. + * @param srcPath + * @param maximumEditLogSeqNum + * @return dstPath take file's last edit log seq num as the name + */ + static Path getCompletedRecoveredEditsFilePath(Path srcPath, + Long maximumEditLogSeqNum) { + String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); + return new Path(srcPath.getParent(), fileName); + } + + static String formatRecoveredEditsFileName(final long seqid) { + return String.format("%019d", seqid); + } + + private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); + private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; + + /** + * @param regiondir + * This regions directory in the filesystem. + * @return The directory that holds recovered edits files for the region + * <code>regiondir</code> + */ + public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { + return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); + } + + /** + * Returns sorted set of edit files made by splitter, excluding files + * with '.temp' suffix. + * + * @param fs + * @param regiondir + * @return Files in passed <code>regiondir</code> as a sorted set. + * @throws IOException + */ + public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs, + final Path regiondir) throws IOException { + NavigableSet<Path> filesSorted = new TreeSet<Path>(); + Path editsdir = getRegionDirRecoveredEditsDir(regiondir); + if (!fs.exists(editsdir)) + return filesSorted; + FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + boolean result = false; + try { + // Return files and only files that match the editfile names pattern. + // There can be other files in this directory other than edit files. + // In particular, on error, we'll move aside the bad edit file giving + // it a timestamp suffix. See moveAsideBadEditsFile. + Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); + result = fs.isFile(p) && m.matches(); + // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, + // because it means splitwal thread is writting this file. + if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { + result = false; + } + } catch (IOException e) { + LOG.warn("Failed isFile check on " + p); + } + return result; + } + }); + if (files == null) { + return filesSorted; + } + for (FileStatus status : files) { + filesSorted.add(status.getPath()); + } + return filesSorted; + } + + /** + * Move aside a bad edits file. + * + * @param fs + * @param edits + * Edits file to move aside. + * @return The name of the moved aside file. + * @throws IOException + */ + public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) + throws IOException { + Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + + System.currentTimeMillis()); + if (!fs.rename(edits, moveAsideName)) { + LOG.warn("Rename failed from " + edits + " to " + moveAsideName); + } + return moveAsideName; + } + + private static final String SEQUENCE_ID_FILE_SUFFIX = "_seqid"; + + /** + * Is the given file a region open sequence id file. + */ + @VisibleForTesting + public static boolean isSequenceIdFile(final Path file) { + return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX); + } + + /** + * Create a file with name as region open sequence id + * + * @param fs + * @param regiondir + * @param newSeqId + * @param saftyBumper + * @return long new sequence Id value + * @throws IOException + */ + public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir, + long newSeqId, long saftyBumper) throws IOException { + + Path editsdir = getRegionDirRecoveredEditsDir(regiondir); + long maxSeqId = 0; + FileStatus[] files = null; + if (fs.exists(editsdir)) { + files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + return isSequenceIdFile(p); + } + }); + if (files != null) { + for (FileStatus status : files) { + String fileName = status.getPath().getName(); + try { + Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length() + - SEQUENCE_ID_FILE_SUFFIX.length())); + maxSeqId = Math.max(tmpSeqId, maxSeqId); + } catch (NumberFormatException ex) { + LOG.warn("Invalid SeqId File Name=" + fileName); + } + } + } + } + if (maxSeqId > newSeqId) { + newSeqId = maxSeqId; + } + newSeqId += saftyBumper; // bump up SeqId + + // write a new seqId file + Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX); + if (!fs.createNewFile(newSeqIdFile)) { + throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + } + // remove old ones + if(files != null) { + for (FileStatus status : files) { + fs.delete(status.getPath(), false); + } + } + return newSeqId; + } + + /** + * Create a new {@link Reader} for reading logs to split. + * + * @param file + * @return A new Reader instance, caller should close + * @throws IOException + * @throws CorruptedLogFileException + */ + protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) + throws IOException, CorruptedLogFileException { + Path path = file.getPath(); + long length = file.getLen(); + Reader in; + + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + if (length <= 0) { + LOG.warn("File " + path + " might be still open, length is 0"); + } + + try { + FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); + try { + in = getReader(path, reporter); + } catch (EOFException e) { + if (length <= 0) { + // TODO should we ignore an empty, not-last log file if skip.errors + // is false? Either way, the caller should decide what to do. E.g. + // ignore if this is the last log in sequence. + // TODO is this scenario still possible if the log has been + // recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty", e); + return null; + } else { + // EOFException being ignored + return null; + } + } + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + // A wal file may not exist anymore. Nothing can be recovered so move on + LOG.warn("File " + path + " doesn't exist anymore.", e); + return null; + } + if (!skipErrors || e instanceof InterruptedIOException) { + throw e; // Don't mark the file corrupted if interrupted, or not skipErrors + } + CorruptedLogFileException t = + new CorruptedLogFileException("skipErrors=true Could not open wal " + + path + " ignoring"); + t.initCause(e); + throw t; + } + return in; + } + + static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) + throws CorruptedLogFileException, IOException { + try { + return in.next(); + } catch (EOFException eof) { + // truncated files are expected if a RS crashes (see HBASE-2643) + LOG.info("EOF from wal " + path + ". continuing"); + return null; + } catch (IOException e) { + // If the IOE resulted from bad file format, + // then this problem is idempotent and retrying won't help + if (e.getCause() != null && + (e.getCause() instanceof ParseException || + e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { + LOG.warn("Parse exception " + e.getCause().toString() + " from wal " + + path + ". continuing"); + return null; + } + if (!skipErrors) { + throw e; + } + CorruptedLogFileException t = + new CorruptedLogFileException("skipErrors=true Ignoring exception" + + " while parsing wal " + path + ". Marking as corrupted"); + t.initCause(e); + throw t; + } + } + + private void writerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + private void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw new IOException(thrown); + } else { + throw new RuntimeException(thrown); + } + } + /** + * Create a new {@link Writer} for writing log splits. + * @return a new Writer instance, caller should close + */ + protected Writer createWriter(Path logfile) + throws IOException { + return walFactory.createRecoveredEditsWriter(fs, logfile); + } + + /** + * Create a new {@link Reader} for reading logs to split. + * @return new Reader instance, caller should close + */ + protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { + return walFactory.createReader(fs, curLogFile, reporter); + } + + /** + * Get current open writers + */ + private int getNumOpenWriters() { + int result = 0; + if (this.outputSink != null) { + result += this.outputSink.getNumOpenWriters(); + } + return result; + } + + /** + * Class which accumulates edits and separates them into a buffer per region + * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses + * a predefined threshold. + * + * Writer threads then pull region-specific buffers from this class. + */ + class EntryBuffers { + Map<byte[], RegionEntryBuffer> buffers = + new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR); + + /* Track which regions are currently in the middle of writing. We don't allow + an IO thread to pick up bytes from a region if we're already writing + data for that region in a different IO thread. */ + Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + + long totalBuffered = 0; + long maxHeapUsage; + + EntryBuffers(long maxHeapUsage) { + this.maxHeapUsage = maxHeapUsage; + } + + /** + * Append a log entry into the corresponding region buffer. + * Blocks if the total heap usage has crossed the specified threshold. + * + * @throws InterruptedException + * @throws IOException + */ + void appendEntry(Entry entry) throws InterruptedException, IOException { + WALKey key = entry.getKey(); + + RegionEntryBuffer buffer; + long incrHeap; + synchronized (this) { + buffer = buffers.get(key.getEncodedRegionName()); + if (buffer == null) { + buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName()); + buffers.put(key.getEncodedRegionName(), buffer); + } + incrHeap= buffer.appendEntry(entry); + } + + // If we crossed the chunk threshold, wait for more space to be available + synchronized (dataAvailable) { + totalBuffered += incrHeap; + while (totalBuffered > maxHeapUsage && thrown.get() == null) { + LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); + dataAvailable.wait(2000); + } + dataAvailable.notifyAll(); + } + checkForErrors(); + } + + /** + * @return RegionEntryBuffer a buffer of edits to be written or replayed. + */ + synchronized RegionEntryBuffer getChunkToWrite() { + long biggestSize = 0; + byte[] biggestBufferKey = null; + + for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { + long size = entry.getValue().heapSize(); + if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { + biggestSize = size; + biggestBufferKey = entry.getKey(); + } + } + if (biggestBufferKey == null) { + return null; + } + + RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); + currentlyWriting.add(biggestBufferKey); + return buffer; + } + + void doneWriting(RegionEntryBuffer buffer) { + synchronized (this) { + boolean removed = currentlyWriting.remove(buffer.encodedRegionName); + assert removed; + } + long size = buffer.heapSize(); + + synchronized (dataAvailable) { + totalBuffered -= size; + // We may unblock writers + dataAvailable.notifyAll(); + } + } + + synchronized boolean isRegionCurrentlyWriting(byte[] region) { + return currentlyWriting.contains(region); + } + } + + /** + * A buffer of some number of edits for a given region. + * This accumulates edits and also provides a memory optimization in order to + * share a single byte array instance for the table and region name. + * Also tracks memory usage of the accumulated edits. + */ + static class RegionEntryBuffer implements HeapSize { + long heapInBuffer = 0; + List<Entry> entryBuffer; + TableName tableName; + byte[] encodedRegionName; + + RegionEntryBuffer(TableName tableName, byte[] region) { + this.tableName = tableName; + this.encodedRegionName = region; + this.entryBuffer = new LinkedList<Entry>(); + } + + long appendEntry(Entry entry) { + internify(entry); + entryBuffer.add(entry); + long incrHeap = entry.getEdit().heapSize() + + ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers + 0; // TODO linkedlist entry + heapInBuffer += incrHeap; + return incrHeap; + } + + private void internify(Entry entry) { + WALKey k = entry.getKey(); + k.internTableName(this.tableName); + k.internEncodedRegionName(this.encodedRegionName); + } + + @Override + public long heapSize() { + return heapInBuffer; + } + } + + class WriterThread extends Thread { + private volatile boolean shouldStop = false; + private OutputSink outputSink = null; + + WriterThread(OutputSink sink, int i) { + super(Thread.currentThread().getName() + "-Writer-" + i); + outputSink = sink; + } + + @Override + public void run() { + try { + doRun(); + } catch (Throwable t) { + LOG.error("Exiting thread", t); + writerThreadError(t); + } + } + + private void doRun() throws IOException { + LOG.debug("Writer thread " + this + ": starting"); + while (true) { + RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); + if (buffer == null) { + // No data currently available, wait on some more to show up + synchronized (dataAvailable) { + if (shouldStop && !this.outputSink.flush()) { + return; + } + try { + dataAvailable.wait(500); + } catch (InterruptedException ie) { + if (!shouldStop) { + throw new RuntimeException(ie); + } + } + } + continue; + } + + assert buffer != null; + try { + writeBuffer(buffer); + } finally { + entryBuffers.doneWriting(buffer); + } + } + } + + private void writeBuffer(RegionEntryBuffer buffer) throws IOException { + outputSink.append(buffer); + } + + void finish() { + synchronized (dataAvailable) { + shouldStop = true; + dataAvailable.notifyAll(); + } + } + } + + /** + * The following class is an abstraction class to provide a common interface to support both + * existing recovered edits file sink and region server WAL edits replay sink + */ + abstract class OutputSink { + + protected Map<byte[], SinkWriter> writers = Collections + .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));; + + protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections + .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR)); + + protected final List<WriterThread> writerThreads = Lists.newArrayList(); + + /* Set of regions which we've decided should not output edits */ + protected final Set<byte[]> blacklistedRegions = Collections + .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR)); + + protected boolean closeAndCleanCompleted = false; + + protected boolean writersClosed = false; + + protected final int numThreads; + + protected CancelableProgressable reporter = null; + + protected AtomicLong skippedEdits = new AtomicLong(); + + protected List<Path> splits = null; + + public OutputSink(int numWriters) { + numThreads = numWriters; + } + + void setReporter(CancelableProgressable reporter) { + this.reporter = reporter; + } + + /** + * Start the threads that will pump data from the entryBuffers to the output files. + */ + synchronized void startWriterThreads() { + for (int i = 0; i < numThreads; i++) { + WriterThread t = new WriterThread(this, i); + t.start(); + writerThreads.add(t); + } + } + + /** + * + * Update region's maximum edit log SeqNum. + */ + void updateRegionMaximumEditLogSeqNum(Entry entry) { + synchronized (regionMaximumEditLogSeqNum) { + Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey() + .getEncodedRegionName()); + if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) { + regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey() + .getLogSeqNum()); + } + } + } + + Long getRegionMaximumEditLogSeqNum(byte[] region) { + return regionMaximumEditLogSeqNum.get(region); + } + + /** + * @return the number of currently opened writers + */ + int getNumOpenWriters() { + return this.writers.size(); + } + + long getSkippedEdits() { + return this.skippedEdits.get(); + } + + /** + * Wait for writer threads to dump all info to the sink + * @return true when there is no error + * @throws IOException + */ + protected boolean finishWriting() throws IOException { + LOG.debug("Waiting for split writer threads to finish"); + boolean progress_failed = false; + for (WriterThread t : writerThreads) { + t.finish(); + } + for (WriterThread t : writerThreads) { + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + } + checkForErrors(); + LOG.info("Split writers finished"); + return (!progress_failed); + } + + abstract List<Path> finishWritingAndClose() throws IOException; + + /** + * @return a map from encoded region ID to the number of edits written out for that region. + */ + abstract Map<byte[], Long> getOutputCounts(); + + /** + * @return number of regions we've recovered + */ + abstract int getNumberOfRecoveredRegions(); + + /** + * @param buffer A WAL Edit Entry + * @throws IOException + */ + abstract void append(RegionEntryBuffer buffer) throws IOException; + + /** + * WriterThread call this function to help flush internal remaining edits in buffer before close + * @return true when underlying sink has something to flush + */ + protected boolean flush() throws IOException { + return false; + } + } + + /** + * Class that manages the output streams from the log splitting process. + */ + class LogRecoveredEditsOutputSink extends OutputSink { + + public LogRecoveredEditsOutputSink(int numWriters) { + // More threads could potentially write faster at the expense + // of causing more disk seeks as the logs are split. + // 3. After a certain setting (probably around 3) the + // process will be bound on the reader in the current + // implementation anyway. + super(numWriters); + } + + /** + * @return null if failed to report progress + * @throws IOException + */ + @Override + List<Path> finishWritingAndClose() throws IOException { + boolean isSuccessful = false; + List<Path> result = null; + try { + isSuccessful = finishWriting(); + } finally { + result = close(); + List<IOException> thrown = closeLogWriters(null); + if (thrown != null && !thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + } + if (isSuccessful) { + splits = result; + } + return splits; + } + + /** + * Close all of the output streams. + * @return the list of paths written. + */ + private List<Path> close() throws IOException { + Preconditions.checkState(!closeAndCleanCompleted); + + final List<Path> paths = new ArrayList<Path>(); + final List<IOException> thrown = Lists.newArrayList(); + ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, + TimeUnit.SECONDS, new ThreadFactory() { + private int count = 1; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "split-log-closeStream-" + count++); + return t; + } + }); + CompletionService<Void> completionService = + new ExecutorCompletionService<Void>(closeThreadPool); + for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) { + LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + completionService.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); + LOG.debug("Closing " + wap.p); + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + return null; + } + LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms)"); + + if (wap.editsWritten == 0) { + // just remove the empty recovered.edits file + if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { + LOG.warn("Failed deleting empty " + wap.p); + throw new IOException("Failed deleting empty " + wap.p); + } + return null; + } + + Path dst = getCompletedRecoveredEditsFilePath(wap.p, + regionMaximumEditLogSeqNum.get(writersEntry.getKey())); + try { + if (!dst.equals(wap.p) && fs.exists(dst)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + dst + ", length=" + + fs.getFileStatus(dst).getLen()); + if (!fs.delete(dst, false)) { + LOG.warn("Failed deleting of old " + dst); + throw new IOException("Failed deleting of old " + dst); + } + } + // Skip the unit tests which create a splitter that reads and + // writes the data without touching disk. + // TestHLogSplit#testThreading is an example. + if (fs.exists(wap.p)) { + if (!fs.rename(wap.p, dst)) { + throw new IOException("Failed renaming " + wap.p + " to " + dst); + } + LOG.info("Rename " + wap.p + " to " + dst); + } + } catch (IOException ioe) { + LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + thrown.add(ioe); + return null; + } + paths.add(dst); + return null; + } + }); + } + + boolean progress_failed = false; + try { + for (int i = 0, n = this.writers.size(); i < n; i++) { + Future<Void> future = completionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + closeThreadPool.shutdownNow(); + } + + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + writersClosed = true; + closeAndCleanCompleted = true; + if (progress_failed) { + return null; + } + return paths; + } + + private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException { + if (writersClosed) { + return thrown; + } + + if (thrown == null) { + thrown = Lists.newArrayList(); + } + try { + for (WriterThread t : writerThreads) { + while (t.isAlive()) { + t.shouldStop = true; + t.interrupt(); + try { + t.join(10); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } + } + } + } finally { + synchronized (writers) { + WriterAndPath wap = null; + for (SinkWriter tmpWAP : writers.values()) { + try { + wap = (WriterAndPath) tmpWAP; + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + continue; + } + LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms)"); + } + } + writersClosed = true; + } + + return thrown; + } + + /** + * Get a writer and path for a log starting at the given entry. This function is threadsafe so + * long as multiple threads are always acting on different regions. + * @return null if this region shouldn't output any logs + */ + private WriterAndPath getWriterAndPath(Entry entry) throws IOException { + byte region[] = entry.getKey().getEncodedRegionName(); + WriterAndPath ret = (WriterAndPath) writers.get(region); + if (ret != null) { + return ret; + } + // If we already decided that this region doesn't get any output + // we don't need to check again. + if (blacklistedRegions.contains(region)) { + return null; + } + ret = createWAP(region, entry, rootDir); + if (ret == null) { + blacklistedRegions.add(region); + return null; + } + writers.put(region, ret); + return ret; + } + + /** + * @return a path with a write for that path. caller should close. + */ + private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { + Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); + if (regionedits == null) { + return null; + } + if (fs.exists(regionedits)) { + LOG.warn("Found old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); + } + } + Writer w = createWriter(regionedits); + LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); + return (new WriterAndPath(regionedits, w)); + } + + @Override + void append(RegionEntryBuffer buffer) throws IOException { + List<Entry> entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return; + } + + WriterAndPath wap = null; + + long startTime = System.nanoTime(); + try { + int editsCount = 0; + + for (Entry logEntry : entries) { + if (wap == null) { + wap = getWriterAndPath(logEntry); + if (wap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry); + } + return; + } + } + wap.w.append(logEntry); + this.updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; + } + // Pass along summary statistics + wap.incrementEdits(editsCount); + wap.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(" Got while writing log entry to log", e); + throw e; + } + } + + /** + * @return a map from encoded region ID to the number of edits written out for that region. + */ + @Override + Map<byte[], Long> getOutputCounts() { + TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + synchronized (writers) { + for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) { + ret.put(entry.getKey(), entry.getValue().editsWritten); + } + } + return ret; + } + + @Override + int getNumberOfRecoveredRegions() { + return writers.size(); + } + } + + /** + * Class wraps the actual writer which writes data out and related statistics + */ + private abstract static class SinkWriter { + /* Count of edits written to this path */ + long editsWritten = 0; + /* Number of nanos spent writing to this log */ + long nanosSpent = 0; + + void incrementEdits(int edits) { + editsWritten += edits; + } + + void incrementNanoTime(long nanos) { + nanosSpent += nanos; + } + } + + /** + * Private data structure that wraps a Writer and its Path, also collecting statistics about the + * data written to this output. + */ + private final static class WriterAndPath extends SinkWriter { + final Path p; + final Writer w; + + WriterAndPath(final Path p, final Writer w) { + this.p = p; + this.w = w; + } + } + + /** + * Class that manages to replay edits from WAL files directly to assigned fail over region servers + */ + class LogReplayOutputSink extends OutputSink { + private static final double BUFFER_THRESHOLD = 0.35; + private static final String KEY_DELIMITER = "#"; + + private long waitRegionOnlineTimeOut; + private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>()); + private final Map<String, RegionServerWriter> writers = + new ConcurrentHashMap<String, RegionServerWriter>(); + // online encoded region name -> region location map + private final Map<String, HRegionLocation> onlineRegions = + new ConcurrentHashMap<String, HRegionLocation>(); + + private Map<TableName, HConnection> tableNameToHConnectionMap = Collections + .synchronizedMap(new TreeMap<TableName, HConnection>()); + /** + * Map key -> value layout + * <servername>:<table name> -> Queue<Row> + */ + private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap = + new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>(); + private List<Throwable> thrown = new ArrayList<Throwable>(); + + // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling + // table. It's a limitation of distributedLogReplay. Because log replay needs a region is + // assigned and online before it can replay wal edits while regions of disabling/disabled table + // won't be assigned by AM. We can retire this code after HBASE-8234. + private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; + private boolean hasEditsInDisablingOrDisabledTables = false; + + public LogReplayOutputSink(int numWriters) { + super(numWriters); + this.waitRegionOnlineTimeOut = + conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, + ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); + this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); + this.logRecoveredEditsOutputSink.setReporter(reporter); + } + + @Override + void append(RegionEntryBuffer buffer) throws IOException { + List<Entry> entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return; + } + + // check if current region in a disabling or disabled table + if (disablingOrDisabledTables.contains(buffer.tableName)) { + // need fall back to old way + logRecoveredEditsOutputSink.append(buffer); + hasEditsInDisablingOrDisabledTables = true; + // store regions we have recovered so far + addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName)); + return; + } + + // group entries by region servers + groupEditsByServer(entries); + + // process workitems + String maxLocKey = null; + int maxSize = 0; + List<Pair<HRegionLocation, Entry>> maxQueue = null; + synchronized (this.serverToBufferQueueMap) { + for (String key : this.serverToBufferQueueMap.keySet()) { + List<Pair<HRegionLocation, Entry>> curQueue = this.serverToBufferQueueMap.get(key); + if (curQueue.size() > maxSize) { + maxSize = curQueue.size(); + maxQueue = curQueue; + maxLocKey = key; + } + } + if (maxSize < minBatchSize + && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) { + // buffer more to process + return; + } else if (maxSize > 0) { + this.serverToBufferQueueMap.remove(maxLocKey); + } + } + + if (maxSize > 0) { + processWorkItems(maxLocKey, maxQueue); + } + } + + private void addToRecoveredRegions(String encodedRegionName) { + if (!recoveredRegions.contains(encodedRegionName)) { + recoveredRegions.add(encodedRegionName); + } + } + + /** + * Helper function to group WALEntries to individual region servers + * @throws IOException + */ + private void groupEditsByServer(List<Entry> entries) throws IOException { + Set<TableName> nonExistentTables = null; + Long cachedLastFlushedSequenceId = -1l; + for (Entry entry : entries) { + WALEdit edit = entry.getEdit(); + TableName table = entry.getKey().getTablename(); + // clear scopes which isn't needed for recovery + entry.getKey().setScopes(null); + String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName()); + // skip edits of non-existent tables + if (nonExistentTables != null && nonExistentTables.contains(table)) { + this.skippedEdits.incrementAndGet(); + continue; + } + + Map<byte[], Long> maxStoreSequenceIds = null; + boolean needSkip = false; + HRegionLocation loc = null; + String locKey = null; + List<Cell> cells = edit.getCells(); + List<Cell> skippedCells = new ArrayList<Cell>(); + HConnection hconn = this.getConnectionByTableName(table); + + for (Cell cell : cells) { + byte[] row = cell.getRow(); + byte[] family = cell.getFamily(); + boolean isCompactionEntry = false; + if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + CompactionDescriptor compaction = WALEdit.getCompaction(cell); + if (compaction != null && compaction.hasRegionName()) { + try { + byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName() + .toByteArray()); + row = regionName[1]; // startKey of the region + family = compaction.getFamilyName().toByteArray(); + isCompactionEntry = true; + } catch (Exception ex) { + LOG.warn("Unexpected exception received, ignoring " + ex); + skippedCells.add(cell); + continue; + } + } else { + skippedCells.add(cell); + continue; + } + } + + try { + loc = + locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row, + encodeRegionNameStr); + // skip replaying the compaction if the region is gone + if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase( + loc.getRegionInfo().getEncodedName())) { + LOG.info("Not replaying a compaction marker for an older region: " + + encodeRegionNameStr); + needSkip = true; + } + } catch (TableNotFoundException ex) { + // table has been deleted so skip edits of the table + LOG.info("Table " + table + " doesn't exist. Skip log replay for region " + + encodeRegionNameStr); + lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE); + if (nonExistentTables == null) { + nonExistentTables = new TreeSet<TableName>(); + } + nonExistentTables.add(table); + this.skippedEdits.incrementAndGet(); + needSkip = true; + break; + } + + cachedLastFlushedSequenceId = + lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); + if (cachedLastFlushedSequenceId != null + && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { + // skip the whole WAL entry + this.skippedEdits.incrementAndGet(); + needSkip = true; + break; + } else { + if (maxStoreSequenceIds == null) { + maxStoreSequenceIds = + regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName()); + } + if (maxStoreSequenceIds != null) { + Long maxStoreSeqId = maxStoreSequenceIds.get(family); + if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { + // skip current kv if column family doesn't exist anymore or already flushed + skippedCells.add(cell); + continue; + } + } + } + } + + // skip the edit + if (loc == null || needSkip) continue; + + if (!skippedCells.isEmpty()) { + cells.removeAll(skippedCells); + } + + synchronized (serverToBufferQueueMap) { + locKey = loc.getHostnamePort() + KEY_DELIMITER + table; + List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey); + if (queue == null) { + queue = + Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>()); + serverToBufferQueueMap.put(locKey, queue); + } + queue.add(new Pair<HRegionLocation, Entry>(loc, entry)); + } + // store regions we have recovered so far + addToRecoveredRegions(loc.getRegionInfo().getEncodedName()); + } + } + + /** + * Locate destination region based on table name & row. This function also makes sure the + * destination region is online for replay. + * @throws IOException + */ + private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, + TableName table, byte[] row, String originalEncodedRegionName) throws IOException { + // fetch location from cache + HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); + if(loc != null) return loc; + // fetch location from hbase:meta directly without using cache to avoid hit old dead server + loc = hconn.getRegionLocation(table, row, true); + if (loc == null) { + throw new IOException("Can't locate location for row:" + Bytes.toString(row) + + " of table:" + table); + } + // check if current row moves to a different region due to region merge/split + if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { + // originalEncodedRegionName should have already flushed + lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); + HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); + if (tmpLoc != null) return tmpLoc; + } + + Long lastFlushedSequenceId = -1l; + AtomicBoolean isRecovering = new AtomicBoolean(true); + loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); + if (!isRecovering.get()) { + // region isn't in recovering at all because WAL file may contain a region that has + // been moved to somewhere before hosting RS fails + lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); + LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + + " because it's not in recovering."); + } else { + Long cachedLastFlushedSequenceId = + lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); + + // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will + // update the value for the region + RegionStoreSequenceIds ids = + csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, + loc.getRegionInfo().getEncodedName()); + if (ids != null) { + lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); + for (StoreSequenceId id : maxSeqIdInStores) { + storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); + } + regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); + } + + if (cachedLastFlushedSequenceId == null + || lastFlushedSequenceId > cachedLastFlushedSequenceId) { + lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); + } + } + + onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); + return loc; + } + + private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions) + throws IOException { + RegionServerWriter rsw = null; + + long startTime = System.nanoTime(); + try { + rsw = getRegionServerWriter(key); + rsw.sink.replayEntries(actions); + + // Pass along summary statistics + rsw.incrementEdits(actions.size()); + rsw.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(" Got while writing log entry to log", e); + throw e; + } + } + + /** + * Wait until region is online on the destination region server + * @param loc + * @param row + * @param timeout How long to wait + * @param isRecovering Recovering state of the region interested on destination region server. + * @return True when region is online on the destination region server + * @throws InterruptedException + */ + private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row, + final long timeout, AtomicBoolean isRecovering) + throws IOException { + final long endTime = EnvironmentEdgeManager.currentTime() + timeout; + final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + boolean reloadLocation = false; + TableName tableName = loc.getRegionInfo().getTable(); + int tries = 0; + Throwable cause = null; + while (endTime > EnvironmentEdgeManager.currentTime()) { + try { + // Try and get regioninfo from the hosting server. + HConnection hconn = getConnectionByTableName(tableName); + if(reloadLocation) { + loc = hconn.getRegionLocation(tableName, row, true); + } + BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName()); + HRegionInfo region = loc.getRegionInfo(); + try { + GetRegionInfoRequest request = + RequestConverter.buildGetRegionInfoRequest(region.getRegionName()); + GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request); + if (HRegionInfo.convert(response.getRegionInfo()) != null) { + isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true); + return loc; + } + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } catch (IOException e) { + cause = e.getCause(); + if(!(cause instanceof RegionOpeningException)) { + reloadLocation = true; + } + } + long expectedSleep = ConnectionUtils.getPauseTime(pause, tries); + try { + Thread.sleep(expectedSleep); + } catch (InterruptedException e) { + throw new IOException("Interrupted when waiting region " + + loc.getRegionInfo().getEncodedName() + " online.", e); + } + tries++; + } + + throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() + + " online for " + timeout + " milliseconds.", cause); + } + + @Override + protected boolean flush() throws IOException { + String curLoc = null; + int curSize = 0; + List<Pair<HRegionLocation, Entry>> curQueue = null; + synchronized (this.serverToBufferQueueMap) { + for (String locationKey : this.serverToBufferQueueMap.keySet()) { + curQueue = this.serverToBufferQueueMap.get(locationKey); + if (!curQueue.isEmpty()) { + curSize = curQueue.size(); + curLoc = locationKey; + break; + } + } + if (curSize > 0) { + this.serverToBufferQueueMap.remove(curLoc); + } + } + + if (curSize > 0) { + this.processWorkItems(curLoc, curQueue); + // We should already have control of the monitor; ensure this is the case. + synchronized(dataAvailable) { + dataAvailable.notifyAll(); + } + return true; + } + return false; + } + + void addWriterError(Throwable t) { + thrown.add(t); + } + + @Override + List<Path> finishWritingAndClose() throws IOException { + try { + if (!finishWriting()) { + return null; + } + if (hasEditsInDisablingOrDisabledTables) { + splits = logRecoveredEditsOutputSink.finishWritingAndClose(); + } else { + splits = new ArrayList<Path>(); + } + // returns an empty array in order to keep interface same as old way + return splits; + } finally { + List<IOException> thrown = closeRegionServerWriters(); + if (thrown != null && !thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + } + } + + @Override + int getNumOpenWriters() { + return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters(); + } + + private List<IOException> closeRegionServerWriters() throws IOException { + List<IOException> result = null; + if (!writersClosed) { + result = Lists.newArrayList(); + try { + for (WriterThread t : writerThreads) { + while (t.isAlive()) { + t.shouldStop = true; + t.interrupt(); + try { + t.join(10); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } + } + } + } finally { + synchronized (writers) { + for (String locationKey : writers.keySet()) { + RegionServerWriter tmpW = writers.get(locationKey); + try { + tmpW.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close writer for region server:" + locationKey, ioe); + result.add(ioe); + } + } + } + + // close connections + synchronized (this.tableNameToHConnectionMap) { + for (TableName tableName : this.tableNameToHConnectionMap.keySet()) { + HConnection hconn = this.tableNameToHConnectionMap.get(tableName); + try { + hconn.clearRegionCache(); + hconn.close(); + } catch (IOException ioe) { + result.add(ioe); + } + } + } + writersClosed = true; + } + } + return result; + } + + @Override + Map<byte[], Long> getOutputCounts() { + TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + synchronized (writers) { + for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) { + ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); + } + } + return ret; + } + + @Override + int getNumberOfRecoveredRegions() { + return this.recoveredRegions.size(); + } + + /** + * Get a writer and path for a log starting at the given entry. This function is threadsafe so + * long as multiple threads are always acting on different regions. + * @return null if this region shouldn't output any logs + */ + private RegionServerWriter getRegionServerWriter(String loc) throws IOException { + RegionServerWriter ret = writers.get(loc); + if (ret != null) { + return ret; + } + + TableName tableName = getTableFromLocationStr(loc); + if(tableName == null){ + throw new IOException("Invalid location string:" + loc + " found. Replay aborted."); + } + + HConnection hconn = getConnectionByTableName(tableName); + synchronized (writers) { + ret = writers.get(loc); + if (ret == null) { + ret = new RegionServerWriter(conf, tableName, hconn); + writers.put(loc, ret); + } + } + return ret; + } + + private HConnection getConnectionByTableName(final TableName tableName) throws IOException { + HConnection hconn = this.tableNameToHConnectionMap.get(tableName); + if (hconn == null) { + synchronized (this.tableNameToHConnectionMap) { + hconn = this.tableNameToHConnectionMap.get(tableName); + if (hconn == null) { + hconn = HConnectionManager.getConnection(conf); + this.tableNameToHConnectionMap.put(tableName, hconn); + } + } + } + return hconn; + } + private TableName getTableFromLocationStr(String loc) { + /** + * location key is in format <server name:port>#<table name> + */ + String[] splits = loc.split(KEY_DELIMITER); + if (splits.length != 2) { + return null; + } + return TableName.valueOf(splits[1]); + } + } + + /** + * Private data structure that wraps a receiving RS and collecting statistics about the data + * written to this newly assigned RS. + */ + private final static class RegionServerWriter extends SinkWriter { + final WALEditsReplaySink sink; + + RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn) + throws IOException { + this.sink = new WALEditsReplaySink(conf, tableName, conn); + } + + void close() throws IOException { + } + } + + static class CorruptedLogFileException extends Exception { + private static final long serialVersionUID = 1L; + + CorruptedLogFileException(String s) { + super(s); + } + } + + /** A struct used by getMutationsFromWALEntry */ + public static class MutationReplay { + public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { + this.type = type; + this.mutation = mutation; + if(this.mutation.getDurability() != Durability.SKIP_WAL) { + // using ASYNC_WAL for relay + this.mutation.setDurability(Durability.ASYNC_WAL); + } + this.nonceGroup = nonceGroup; + this.nonce = nonce; + } + + public final MutationType type; + public final Mutation mutation; + public final long nonceGroup; + public final long nonce; + } + + /** + * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & + * WALEdit from the passed in WALEntry + * @param entry + * @param cells + * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances + * extracted from the passed in WALEntry. + * @return list of Pair<MutationType, Mutation> to be replayed + * @throws IOException + */ + public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells, + Pair<WALKey, WALEdit> logEntry) throws IOException { + + if (entry == null) { + // return an empty array + return new ArrayList<MutationReplay>(); + } + + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); + int count = entry.getAssociatedCellCount(); + List<MutationReplay> mutations = new ArrayList<MutationReplay>(); + Cell previousCell = null; + Mutation m = null; + WALKey key = null; + WALEdit val = null; + if (logEntry != null) val = new WALEdit(); + + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + Cell cell = cells.current(); + if (val != null) val.add(cell); + + boolean isNewRowOrType = + previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRow(previousCell, cell); + if (isNewRowOrType) { + // Create new mutation + if (CellUtil.isDelete(cell)) { + m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + // Deletes don't have nonces. + mutations.add(new MutationReplay( + MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE)); + } else { + m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + // Puts might come from increment or append, thus we need nonces. + long nonceGroup = entry.getKey().hasNonceGroup() + ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; + long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; + mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce)); + } + } + if (CellUtil.isDelete(cell)) { + ((Delete) m).addDeleteMarker(cell); + } else { + ((Put) m).add(cell); + } + previousCell = cell; + } + + // reconstruct WALKey + if (logEntry != null) { + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey(); + List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount()); + for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { + clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); + } + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( + walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), + clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce()); + logEntry.setFirst(key); + logEntry.setSecond(val); + } + + return mutations; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 54d47e5..325fe0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -157,7 +157,7 @@ public class ZKSplitLog { public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) { long lastRecordedFlushedSequenceId = -1l; try { - lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes); + lastRecordedFlushedSequenceId = ZKUtil.parseWALPositionFrom(bytes); } catch (DeserializationException e) { lastRecordedFlushedSequenceId = -1l; LOG.warn("Can't parse last flushed sequence Id", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index de2bf64..8dba04c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -140,7 +140,7 @@ public abstract class HBaseTestCase extends TestCase { /** * You must call close on the returned region and then close on the log file - * it created. Do {@link HRegion#close()} followed by {@link HRegion#getLog()} + * it created. Do {@link HRegion#close()} followed by {@link HRegion#getWAL()} * and on it call close. * @param desc * @param startKey http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 271401b..393c314 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -89,7 +89,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.tool.Canary; import org.apache.hadoop.hbase.util.Bytes; @@ -568,7 +568,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return this.dfsCluster; } - public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException { + public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { createDirsAndSetProperties(); dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); @@ -1638,18 +1638,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** - * Create an HRegion that writes to the local tmp dirs with specified hlog + * Create an HRegion that writes to the local tmp dirs with specified wal * @param info regioninfo * @param desc table descriptor - * @param hlog hlog for this region. + * @param wal wal for this region. * @return created hregion * @throws IOException */ - public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, HLog hlog) throws IOException { - return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, hlog); + public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, WAL wal) + throws IOException { + return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal); } - /** * @param tableName * @param startKey @@ -1664,7 +1664,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, - HLog hlog, byte[]... families) throws IOException { + WAL wal, byte[]... families) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); htd.setReadOnly(isReadOnly); for (byte[] family : families) { @@ -1675,7 +1675,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } htd.setDurability(durability); HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false); - return createLocalHRegion(info, htd, hlog); + return createLocalHRegion(info, htd, wal); } // // ========================================================================== @@ -1854,7 +1854,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { Put put = new Put(k); put.setDurability(Durability.SKIP_WAL); put.add(f, null, k); - if (r.getLog() == null) put.setDurability(Durability.SKIP_WAL); + if (r.getWAL() == null) put.setDurability(Durability.SKIP_WAL); int preRowCount = rowCount; int pause = 10; @@ -2872,7 +2872,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and * makes tests linger. Here is the exception you'll see: * <pre> - * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... + * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... * </pre> * @param stream A DFSClient.DFSOutputStream. * @param max