HBASE-13855 Race in multi threaded PartitionedMobCompactor causes NPE. (Jingcheng)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/faefb907 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/faefb907 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/faefb907 Branch: refs/heads/master Commit: faefb9073f388663df91d0ef2db24b00d6512519 Parents: 26893aa Author: anoopsjohn <[email protected]> Authored: Thu Jun 11 15:32:19 2015 +0530 Committer: anoopsjohn <[email protected]> Committed: Thu Jun 11 15:32:19 2015 +0530 ---------------------------------------------------------------------- .../compactions/PartitionedMobCompactor.java | 37 ++++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/faefb907/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 8cda746..6c2ff01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -211,14 +211,22 @@ public class PartitionedMobCompactor extends MobCompactor { } List<Path> newDelPaths = compactDelFiles(request, delFilePaths); List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); - for (Path newDelPath : newDelPaths) { - StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); - newDelFiles.add(sf); + List<Path> paths = null; + try { + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + // pre-create reader of a del file to avoid race condition when opening the reader in each + // partition. + sf.createReader(); + newDelFiles.add(sf); + } + LOG.info("After merging, there are " + newDelFiles.size() + " del files"); + // compact the mob files by partitions. + paths = compactMobFiles(request, newDelFiles); + LOG.info("After compaction, there are " + paths.size() + " mob files"); + } finally { + closeStoreFileReaders(newDelFiles); } - LOG.info("After merging, there are " + newDelFiles.size() + " del files"); - // compact the mob files by partitions. - List<Path> paths = compactMobFiles(request, newDelFiles); - LOG.info("After compaction, there are " + paths.size() + " mob files"); // archive the del files if all the mob files are selected. if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { LOG.info("After a mob compaction with all files selected, archiving the del files " @@ -337,6 +345,20 @@ public class PartitionedMobCompactor extends MobCompactor { } /** + * Closes the readers of store files. + * @param storeFiles The store files to be closed. + */ + private void closeStoreFileReaders(List<StoreFile> storeFiles) { + for (StoreFile storeFile : storeFiles) { + try { + storeFile.closeReader(true); + } catch (IOException e) { + LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); + } + } + } + + /** * Compacts a partition of selected small mob files and all the del files in a batch. * @param request The compaction request. * @param partition A compaction partition. @@ -415,6 +437,7 @@ public class PartitionedMobCompactor extends MobCompactor { } // archive the old mob files, do not archive the del files. try { + closeStoreFileReaders(mobFilesToCompact); MobUtils .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); } catch (IOException e) {
