Repository: hbase Updated Branches: refs/heads/hbase-11339 b72eb7f92 -> aa523164e
HBASE-12201 Close the writers in the MOB sweep tool (Jingcheng Du) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aa523164 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aa523164 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aa523164 Branch: refs/heads/hbase-11339 Commit: aa523164e825567d93eb0b2a191955ca195ea242 Parents: b72eb7f Author: Ramkrishna <[email protected]> Authored: Thu Oct 9 09:50:39 2014 +0530 Committer: Ramkrishna <[email protected]> Committed: Thu Oct 9 09:50:39 2014 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/mob/MobUtils.java | 4 +-- .../hadoop/hbase/mob/mapreduce/SweepJob.java | 29 ++++++++++++++------ .../hbase/mob/mapreduce/SweepReducer.java | 14 ++++++---- 3 files changed, 30 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 4001520..0f6aa6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.util.FSUtils; public class MobUtils { private static final Log LOG = LogFactory.getLog(MobUtils.class); - private static final String COMPACTION_WORKING_DIR_NAME = "working"; private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT = new ThreadLocal<SimpleDateFormat>() { @@ -360,8 +359,7 @@ public class MobUtils { * @return The directory of the mob compaction for the current job. */ public static Path getCompactionWorkingPath(Path root, String jobName) { - Path parent = new Path(root, jobName); - return new Path(parent, COMPACTION_WORKING_DIR_NAME); + return new Path(root, jobName); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java index 1d048bb..2ab12d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,6 +58,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.serializer.JavaSerialization; import org.apache.hadoop.io.serializer.WritableSerialization; import org.apache.hadoop.mapreduce.Job; @@ -288,9 +290,6 @@ public class SweepJob { job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString()); Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR); job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString()); - // create a file includes all the existing mob files whose creation time is older than - // (now - oneDay) - fs.create(allFileNamesPath, true); // create a directory where the files contain names of visited mob files are saved. fs.mkdirs(vistiedFileNamesPath); Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName); @@ -311,15 +310,25 @@ public class SweepJob { } } } - // write the names to a sequence file - SequenceFile.Writer writer = SequenceFile.createWriter(fs, job.getConfiguration(), - allFileNamesPath, String.class, String.class); + FSDataOutputStream fout = null; + SequenceFile.Writer writer = null; try { + // create a file includes all the existing mob files whose creation time is older than + // (now - oneDay) + fout = fs.create(allFileNamesPath, true); + // write the names to a sequence file + writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class, + CompressionType.NONE, null); for (String fileName : fileNames) { writer.append(fileName, MobConstants.EMPTY_STRING); } } finally { - IOUtils.closeStream(writer); + if (writer != null) { + IOUtils.closeStream(writer); + } + if (fout != null) { + IOUtils.closeStream(fout); + } } } @@ -366,7 +375,7 @@ public class SweepJob { } while (nextAll != null || nextVisited != null); } finally { if (allNamesReader != null) { - allNamesReader.close(); + IOUtils.closeStream(allNamesReader); } if (visitedNamesReader != null) { visitedNamesReader.close(); @@ -517,7 +526,9 @@ public class SweepJob { public void close() { for (SequenceFile.Reader reader : readers) { - IOUtils.closeStream(reader); + if (reader != null) { + IOUtils.closeStream(reader); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/aa523164/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java index 04fe359..ab8379e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; @@ -149,6 +151,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE); ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId, new DummyMobAbortable()); + FSDataOutputStream fout = null; try { SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId); tracker.start(); @@ -157,11 +160,9 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY); Path nameFilePath = new Path(dir, UUID.randomUUID().toString() .replace("-", MobConstants.EMPTY_STRING)); - if (!fs.exists(nameFilePath)) { - fs.create(nameFilePath, true); - } - writer = SequenceFile.createWriter(fs, context.getConfiguration(), nameFilePath, - String.class, String.class); + fout = fs.create(nameFilePath, true); + writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class, + String.class, CompressionType.NONE, null); SweepPartitionId id; SweepPartition partition = null; // the mob files which have the same start key and date are in the same partition. @@ -195,6 +196,9 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> { if (writer != null) { IOUtils.closeStream(writer); } + if (fout != null) { + IOUtils.closeStream(fout); + } if (table != null) { try { table.close();
