Repository: hbase Updated Branches: refs/heads/master 6e136f26b -> 301062566
HBASE-19613 Miscellaneous changes to WALSplitter. * Use ArrayList instead LinkedList * Use Apache Commons where appropriate * Parameterize and improve logging Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30106256 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30106256 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30106256 Branch: refs/heads/master Commit: 301062566ac6e32d5bc3c6dbfd819b5e62742e8c Parents: 6e136f2 Author: BELUGA BEHR <dam6...@gmail.com> Authored: Wed Jan 3 18:29:09 2018 -0800 Committer: Apekshit Sharma <a...@apache.org> Committed: Wed Jan 3 18:30:10 2018 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/wal/WALSplitter.java | 163 +++++++++---------- 1 file changed, 75 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/30106256/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 328390e..2aad203 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -24,9 +24,9 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -48,6 +48,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -86,14 +89,14 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; @@ -203,7 +206,7 @@ public class WALSplitter { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List<Path> splits = new ArrayList<>(); - if (logfiles != null && logfiles.length > 0) { + if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); if (s.splitLogFile(logfile, null)) { @@ -245,7 +248,7 @@ public class WALSplitter { this.fileBeingSplit = logfile; try { long logLength = logfile.getLen(); - LOG.info("Splitting WAL=" + logPath + ", length=" + logLength); + LOG.info("Splitting WAL={}, length={}", logPath, logLength); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { progress_failed = true; @@ -253,7 +256,7 @@ public class WALSplitter { } logFileReader = getReader(logfile, skipErrors, reporter); if (logFileReader == null) { - LOG.warn("Nothing to split in WAL=" + logPath); + LOG.warn("Nothing to split in WAL={}", logPath); return true; } int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); @@ -317,7 +320,7 @@ public class WALSplitter { iie.initCause(ie); throw iie; } catch (CorruptedLogFileException e) { - LOG.warn("Could not parse, corrupted WAL=" + logPath, e); + LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs); @@ -330,14 +333,13 @@ public class WALSplitter { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; throw e; } finally { - LOG.debug("Finishing writing output logs and closing down."); + LOG.debug("Finishing writing output logs and closing down"); try { if (null != logFileReader) { logFileReader.close(); } } catch (IOException exception) { - LOG.warn("Could not close WAL reader: " + exception.getMessage()); - LOG.debug("exception details", exception); + LOG.warn("Could not close WAL reader", exception); } try { if (outputSinkStarted) { @@ -417,11 +419,11 @@ public class WALSplitter { final FileSystem fs, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { - LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to " - + corruptDir); + LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", + corruptDir); } if (!fs.mkdirs(corruptDir)) { - LOG.info("Unable to mkdir " + corruptDir); + LOG.info("Unable to mkdir {}", corruptDir); } fs.mkdirs(oldLogDir); @@ -431,9 +433,9 @@ public class WALSplitter { Path p = new Path(corruptDir, corrupted.getName()); if (fs.exists(corrupted)) { if (!fs.rename(corrupted, p)) { - LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); } else { - LOG.warn("Moved corrupted log " + corrupted + " to " + p); + LOG.warn("Moved corrupted log {} to {}", corrupted, p); } } } @@ -442,9 +444,9 @@ public class WALSplitter { Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); if (fs.exists(p)) { if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { - LOG.warn("Unable to move " + p + " to " + newPath); + LOG.warn("Unable to move {} to {}", p, newPath); } else { - LOG.info("Archived processed log " + p + " to " + newPath); + LOG.info("Archived processed log {} to {}", p, newPath); } } } @@ -474,9 +476,9 @@ public class WALSplitter { Path dir = getRegionDirRecoveredEditsDir(regiondir); if (!fs.exists(regiondir)) { - LOG.info("This region's directory doesn't exist: " - + regiondir.toString() + ". It is very likely that it was" + - " already split so it's safe to discard those edits."); + LOG.info("This region's directory does not exist: {}." + + "It is very likely that it was already split so it is " + + "safe to discard those edits.", regiondir); return null; } if (fs.exists(dir) && fs.isFile(dir)) { @@ -486,16 +488,16 @@ public class WALSplitter { } tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); - LOG.warn("Found existing old file: " + dir + ". It could be some " + LOG.warn("Found existing old file: {}. It could be some " + "leftover of an old installation. It should be a folder instead. " - + "So moving it to " + tmp); + + "So moving it to {}", dir, tmp); if (!fs.rename(dir, tmp)) { - LOG.warn("Failed to sideline old file " + dir); + LOG.warn("Failed to sideline old file {}", dir); } } if (!fs.exists(dir) && !fs.mkdirs(dir)) { - LOG.warn("mkdir failed on " + dir); + LOG.warn("mkdir failed on {}", dir); } // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure @@ -554,8 +556,9 @@ public class WALSplitter { final Path regiondir) throws IOException { NavigableSet<Path> filesSorted = new TreeSet<>(); Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) + if (!fs.exists(editsdir)) { return filesSorted; + } FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { @@ -577,16 +580,13 @@ public class WALSplitter { result = false; } } catch (IOException e) { - LOG.warn("Failed isFile check on " + p); + LOG.warn("Failed isFile check on {}", p, e); } return result; } }); - if (files == null) { - return filesSorted; - } - for (FileStatus status : files) { - filesSorted.add(status.getPath()); + if (ArrayUtils.isNotEmpty(files)) { + Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); } return filesSorted; } @@ -605,7 +605,7 @@ public class WALSplitter { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); if (!fs.rename(edits, moveAsideName)) { - LOG.warn("Rename failed from " + edits + " to " + moveAsideName); + LOG.warn("Rename failed from {} to {}", edits, moveAsideName); } return moveAsideName; } @@ -655,7 +655,7 @@ public class WALSplitter { - SEQUENCE_ID_FILE_SUFFIX_LENGTH)); maxSeqId = Math.max(tmpSeqId, maxSeqId); } catch (NumberFormatException ex) { - LOG.warn("Invalid SeqId File Name=" + fileName); + LOG.warn("Invalid SeqId File Name={}", fileName); } } } @@ -672,10 +672,8 @@ public class WALSplitter { if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } - if (LOG.isDebugEnabled()) { - LOG.debug("Wrote file=" + newSeqIdFile + ", newSeqId=" + newSeqId - + ", maxSeqId=" + maxSeqId); - } + LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile, + newSeqId, maxSeqId); } catch (FileAlreadyExistsException ignored) { // latest hdfs throws this exception. it's all right if newSeqIdFile already exists } @@ -683,10 +681,9 @@ public class WALSplitter { // remove old ones if (files != null) { for (FileStatus status : files) { - if (newSeqIdFile.equals(status.getPath())) { - continue; + if (!newSeqIdFile.equals(status.getPath())) { + fs.delete(status.getPath(), false); } - fs.delete(status.getPath(), false); } } return newSeqId; @@ -710,7 +707,7 @@ public class WALSplitter { // zero length even if the file has been sync'd. Revisit if HDFS-376 or // HDFS-878 is committed. if (length <= 0) { - LOG.warn("File " + path + " might be still open, length is 0"); + LOG.warn("File {} might be still open, length is 0", path); } try { @@ -724,17 +721,15 @@ public class WALSplitter { // ignore if this is the last log in sequence. // TODO is this scenario still possible if the log has been // recovered (i.e. closed) - LOG.warn("Could not open " + path + " for reading. File is empty", e); - return null; - } else { - // EOFException being ignored - return null; + LOG.warn("Could not open {} for reading. File is empty", path, e); } + // EOFException being ignored + return null; } } catch (IOException e) { if (e instanceof FileNotFoundException) { // A wal file may not exist anymore. Nothing can be recovered so move on - LOG.warn("File " + path + " doesn't exist anymore.", e); + LOG.warn("File {} does not exist anymore", path, e); return null; } if (!skipErrors || e instanceof InterruptedIOException) { @@ -755,7 +750,7 @@ public class WALSplitter { return in.next(); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) - LOG.info("EOF from wal " + path + ". continuing"); + LOG.info("EOF from wal {}. Continuing.", path); return null; } catch (IOException e) { // If the IOE resulted from bad file format, @@ -763,8 +758,7 @@ public class WALSplitter { if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { - LOG.warn("Parse exception " + e.getCause().toString() + " from wal " - + path + ". continuing"); + LOG.warn("Parse exception from wal {}. Continuing", path, e); return null; } if (!skipErrors) { @@ -893,8 +887,7 @@ public class WALSplitter { synchronized (controller.dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { - LOG.debug("Used " + totalBuffered + - " bytes of buffered edits, waiting for IO threads..."); + LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); controller.dataAvailable.wait(2000); } controller.dataAvailable.notifyAll(); @@ -980,7 +973,7 @@ public class WALSplitter { RegionEntryBuffer(TableName tableName, byte[] region) { this.tableName = tableName; this.encodedRegionName = region; - this.entryBuffer = new LinkedList<>(); + this.entryBuffer = new ArrayList<>(); } long appendEntry(Entry entry) { @@ -1041,7 +1034,7 @@ public class WALSplitter { } private void doRun() throws IOException { - if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting"); + LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { @@ -1190,7 +1183,7 @@ public class WALSplitter { } } controller.checkForErrors(); - LOG.info(this.writerThreads.size() + " split writers finished; closing..."); + LOG.info("{} split writers finished; closing.", this.writerThreads.size()); return (!progress_failed); } @@ -1257,7 +1250,7 @@ public class WALSplitter { } finally { result = close(); List<IOException> thrown = closeLogWriters(null); - if (thrown != null && !thrown.isEmpty()) { + if (CollectionUtils.isNotEmpty(thrown)) { throw MultipleIOException.createIOException(thrown); } } @@ -1276,25 +1269,22 @@ public class WALSplitter { dstMinLogSeqNum = entry.getKey().getSequenceId(); } } catch (EOFException e) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?", - e); - } + LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", + dst, e); } if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + fs.getFileStatus(dst).getLen()); if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); + LOG.warn("Failed deleting of old {}", dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p + ", length=" + fs.getFileStatus(wap.p).getLen()); if (!fs.delete(wap.p, false)) { - LOG.warn("Failed deleting of " + wap.p); + LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } } @@ -1377,13 +1367,11 @@ public class WALSplitter { Path closeWriter(String encodedRegionName, WriterAndPath wap, List<IOException> thrown) throws IOException{ - if (LOG.isTraceEnabled()) { - LOG.trace("Closing " + wap.p); - } + LOG.trace("Closing {}", wap.p); try { wap.w.close(); } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); + LOG.error("Could not close log at {}", wap.p, ioe); thrown.add(ioe); return null; } @@ -1395,7 +1383,7 @@ public class WALSplitter { if (wap.editsWritten == 0) { // just remove the empty recovered.edits file if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { - LOG.warn("Failed deleting empty " + wap.p); + LOG.warn("Failed deleting empty {}", wap.p); throw new IOException("Failed deleting empty " + wap.p); } return null; @@ -1414,10 +1402,10 @@ public class WALSplitter { if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } - LOG.info("Rename " + wap.p + " to " + dst); + LOG.info("Rename {} to {}", wap.p, dst); } } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + LOG.error("Could not rename {} to {}", wap.p, dst, ioe); thrown.add(ioe); return null; } @@ -1428,7 +1416,6 @@ public class WALSplitter { if (writersClosed) { return thrown; } - if (thrown == null) { thrown = Lists.newArrayList(); } @@ -1453,7 +1440,7 @@ public class WALSplitter { wap = (WriterAndPath) tmpWAP; wap.w.close(); } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); + LOG.error("Couldn't close log at {}", wap.p, ioe); thrown.add(ioe); continue; } @@ -1508,18 +1495,18 @@ public class WALSplitter { + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); + LOG.warn("Failed delete of old {}", regionedits); } } Writer w = createWriter(regionedits); - LOG.debug("Creating writer path=" + regionedits); + LOG.debug("Creating writer path={}", regionedits); return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); } void filterCellByStore(Entry logEntry) { Map<byte[], Long> maxSeqIdInStores = regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); - if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { + if (MapUtils.isEmpty(maxSeqIdInStores)) { return; } // Create the array list for the cells that aren't filtered. @@ -1567,11 +1554,9 @@ public class WALSplitter { if (wap == null) { wap = getWriterAndPath(logEntry, reusable); if (wap == null) { - if (LOG.isTraceEnabled()) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry); - } + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); return null; } } @@ -1590,7 +1575,7 @@ public class WALSplitter { } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException)e).unwrapRemoteException() : e; - LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e); + LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); throw e; } return wap; @@ -1599,8 +1584,8 @@ public class WALSplitter { @Override public boolean keepRegionEvent(Entry entry) { ArrayList<Cell> cells = entry.getEdit().getCells(); - for (int i = 0; i < cells.size(); i++) { - if (WALEdit.isCompactionMarker(cells.get(i))) { + for (Cell cell : cells) { + if (WALEdit.isCompactionMarker(cell)) { return true; } } @@ -1657,7 +1642,7 @@ public class WALSplitter { List<IOException> thrown, List<Path> paths) throws InterruptedException, ExecutionException { for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) { - LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName); + LOG.info("Submitting writeThenClose of {}", buffer.getValue().encodedRegionName); completionService.submit(new Callable<Void>() { public Void call() throws Exception { Path dst = writeThenClose(buffer.getValue()); @@ -1835,7 +1820,7 @@ public class WALSplitter { if (entry == null) { // return an empty array - return new ArrayList<>(); + return Collections.emptyList(); } long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? @@ -1846,7 +1831,9 @@ public class WALSplitter { Mutation m = null; WALKeyImpl key = null; WALEdit val = null; - if (logEntry != null) val = new WALEdit(); + if (logEntry != null) { + val = new WALEdit(); + } for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off