HBASE-11861 Native MOB Compaction mechanisms (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c4934ed Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c4934ed Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c4934ed Branch: refs/heads/hbase-11339 Commit: 2c4934eda68e8ed1290c2e3fb50604c2d77bdf64 Parents: fbbb324 Author: Jonathan M Hsieh <[email protected]> Authored: Fri Feb 6 05:37:13 2015 -0800 Committer: Jonathan M Hsieh <[email protected]> Committed: Fri Feb 6 05:37:13 2015 -0800 ---------------------------------------------------------------------- .../src/main/resources/hbase-default.xml | 51 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 6 + .../hbase/master/MobFileCompactionChore.java | 162 +++++ .../apache/hadoop/hbase/mob/MobConstants.java | 43 ++ .../org/apache/hadoop/hbase/mob/MobUtils.java | 118 +++- .../MobFileCompactionRequest.java | 64 ++ .../mob/filecompactions/MobFileCompactor.java | 78 +++ .../PartitionedMobFileCompactionRequest.java | 146 +++++ .../PartitionedMobFileCompactor.java | 631 ++++++++++++++++++ .../hadoop/hbase/regionserver/HStore.java | 2 +- .../hadoop/hbase/regionserver/StoreScanner.java | 4 +- .../filecompactions/TestMobFileCompactor.java | 652 +++++++++++++++++++ ...TestPartitionedMobFileCompactionRequest.java | 60 ++ .../TestPartitionedMobFileCompactor.java | 423 ++++++++++++ 14 files changed, 2426 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 647defd..d1429ad 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1517,4 +1517,55 @@ possible configurations would overwhelm and obscure the important. The default value is one day. </description> </property> + <property> + <name>hbase.mob.file.compaction.mergeable.threshold</name> + <value>201326592</value> + <description> + If the size of a mob file is less than this value, it's regarded as a small + file and needs to be merged in mob file compaction. The default value is 192MB. + </description> + </property> + <property> + <name>hbase.mob.delfile.max.count</name> + <value>3</value> + <description> + The max number of del files that is allowed in the mob file compaction. + In the mob file compaction, when the number of existing del files is larger than + this value, they are merged until number of del files is not larger this value. + The default value is 3. + </description> + </property> + <property> + <name>hbase.mob.file.compaction.batch.size</name> + <value>100</value> + <description> + The max number of the mob files that is allowed in a batch of the mob file compaction. + The mob file compaction merges the small mob files to bigger ones. If the number of the + small files is very large, it could lead to a "too many opened file handlers" in the merge. + And the merge has to be split into batches. This value limits the number of mob files + that are selected in a batch of the mob file compaction. The default value is 100. + </description> + </property> + <property> + <name>hbase.master.mob.file.compaction.chore.period</name> + <value>604800000</value> + <description> + The period that MobFileCompactionChore runs. The unit is millisecond. + The default value is one week. + </description> + </property> + <property> + <name>hbase.mob.file.compactor.class</name> + <value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value> + <description> + Implementation of mob file compactor, the default one is PartitionedMobFileCompactor. + </description> + </property> + <property> + <name>hbase.master.mob.file.compaction.chore.threads.max</name> + <value>1</value> + <description> + The max number of threads used in MobFileCompactionChore. + </description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4ff3592..7ad49a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -209,6 +209,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ExpiredMobFileCleanerChore expiredMobFileCleanerChore; + private MobFileCompactionChore mobFileCompactChore; MasterCoprocessorHost cpHost; @@ -613,6 +614,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this); Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread()); + this.mobFileCompactChore = new MobFileCompactionChore(this); + Threads.setDaemonThreadRunning(mobFileCompactChore.getThread()); if (this.cpHost != null) { // don't let cp initialization errors kill the master @@ -863,6 +866,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (this.expiredMobFileCleanerChore != null) { this.expiredMobFileCleanerChore.interrupt(); } + if (this.mobFileCompactChore != null) { + this.mobFileCompactChore.interrupt(); + } if (this.balancerChore != null) { this.balancerChore.interrupt(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java new file mode 100644 index 0000000..9973619 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java @@ -0,0 +1,162 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Threads; + +/** + * The Class MobFileCompactChore for running compaction regularly to merge small mob files. + */ [email protected] +public class MobFileCompactionChore extends Chore{ + + private static final Log LOG = LogFactory.getLog(MobFileCompactionChore.class); + private HMaster master; + private TableLockManager tableLockManager; + private ExecutorService pool; + + public MobFileCompactionChore(HMaster master) { + super(master.getServerName() + "-MobFileCompactChore", master.getConfiguration().getInt( + MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master); + this.master = master; + this.tableLockManager = master.getTableLockManager(); + this.pool = createThreadPool(); + } + + @Override + protected void chore() { + try { + String className = master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY, + PartitionedMobFileCompactor.class.getName()); + TableDescriptors htds = master.getTableDescriptors(); + Map<String, HTableDescriptor> map = htds.getAll(); + for (HTableDescriptor htd : map.values()) { + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + if (!hcd.isMobEnabled()) { + continue; + } + // instantiate the mob file compactor. + MobFileCompactor compactor = null; + try { + compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class, + ExecutorService.class }, + new Object[] { master.getConfiguration(), master.getFileSystem(), htd.getTableName(), + hcd, pool }); + } catch (Exception e) { + throw new IOException("Unable to load configured mob file compactor '" + className + + "'", e); + } + // compact only for mob-enabled column. + // obtain a write table lock before performing compaction to avoid race condition + // with major compaction in mob-enabled column. + boolean tableLocked = false; + TableLock lock = null; + try { + // the tableLockManager might be null in testing. In that case, it is lock-free. + if (tableLockManager != null) { + lock = tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()), + "Run MobFileCompactChore"); + lock.acquire(); + } + tableLocked = true; + compactor.compact(); + } catch (Exception e) { + LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString() + + " in the table " + htd.getNameAsString(), e); + } finally { + if (lock != null && tableLocked) { + try { + lock.release(); + } catch (IOException e) { + LOG.error( + "Fail to release the write lock for the table " + htd.getNameAsString(), e); + } + } + } + } + } + } catch (Exception e) { + LOG.error("Fail to clean the expired mob files", e); + } + } + + @Override + protected void cleanup() { + super.cleanup(); + pool.shutdown(); + } + + /** + * Creates a thread pool. + * @return A thread pool. + */ + private ExecutorService createThreadPool() { + Configuration conf = master.getConfiguration(); + int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX); + if (maxThreads == 0) { + maxThreads = 1; + } + long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME); + final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); + ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + return pool; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index f40c952..0c9cda8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -72,8 +72,51 @@ public class MobConstants { public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l; public final static String TEMP_DIR_NAME = ".tmp"; + public final static String BULKLOAD_DIR_NAME = ".bulkload"; public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock"); public final static String EMPTY_STRING = ""; + /** + * If the size of a mob file is less than this value, it's regarded as a small file and needs to + * be merged in mob file compaction. The default value is 192MB. + */ + public static final String MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = + "hbase.mob.file.compaction.mergeable.threshold"; + public static final long DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024; + /** + * The max number of del files that is allowed in the mob file compaction. In the mob file + * compaction, when the number of existing del files is larger than this value, they are merged + * until number of del files is not larger this value. The default value is 3. + */ + public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count"; + public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3; + /** + * The max number of the mob files that is allowed in a batch of the mob file compaction. + * The mob file compaction merges the small mob files to bigger ones. If the number of the + * small files is very large, it could lead to a "too many opened file handlers" in the merge. + * And the merge has to be split into batches. This value limits the number of mob files + * that are selected in a batch of the mob file compaction. The default value is 100. + */ + public static final String MOB_FILE_COMPACTION_BATCH_SIZE = + "hbase.mob.file.compaction.batch.size"; + public static final int DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE = 100; + /** + * The period that MobFileCompactionChore runs. The unit is millisecond. + * The default value is one week. + */ + public static final String MOB_FILE_COMPACTION_CHORE_PERIOD = + "hbase.master.mob.file.compaction.chore.period"; + public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD = + 24 * 60 * 60 * 1000 * 7; // a week + public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class"; + /** + * The max number of threads used in MobFileCompactionChore. + */ + public static final String MOB_FILE_COMPACTION_CHORE_THREADS_MAX = + "hbase.master.mob.file.compaction.chore.threads.max"; + public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX = 1; + public static final String MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME = + "hbase.master.mob.file.compaction.chore.threads.keepalivetime"; + public static final long DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME = 60; private MobConstants() { } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 43521d2..d8b1376 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -416,7 +417,7 @@ public class MobUtils { } /** - * Creates a directory of mob files for flushing. + * Creates a writer for the mob file in temp directory. * @param conf The current configuration. * @param fs The current file system. * @param family The descriptor of the current column family. @@ -435,17 +436,110 @@ public class MobUtils { throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() .replaceAll("-", "")); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig); + } + + /** + * Creates a writer for the ref file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig) + throws IOException { + HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true) + .withIncludesTags(true).withCompression(family.getCompactionCompression()) + .withCompressTags(family.shouldCompressTags()).withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build(); + Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")); + StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath) + .withComparator(KeyValue.COMPARATOR).withBloomType(family.getBloomFilterType()) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + return w; + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig) + throws IOException { + MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() + .replaceAll("-", "")); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig); + } + + /** + * Creates a writer for the del file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. + * @return The writer for the del file. + * @throws IOException + */ + public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, String date, Path basePath, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig) + throws IOException { + String suffix = UUID + .randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = MobFileName.create(startKey, date, suffix); + return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, + cacheConfig); + } + + /** + * Creates a writer for the del file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @return The writer for the mob file. + * @throws IOException + */ + private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException { HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withIncludesMvcc(false).withIncludesTags(true) - .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) - .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) - .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) - .withDataBlockEncoding(family.getDataBlockEncoding()).build(); + .withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build(); StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs) - .withFilePath(new Path(basePath, mobFileName.getFileName())) - .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) - .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + .withFilePath(new Path(basePath, mobFileName.getFileName())) + .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); return w; } @@ -456,12 +550,13 @@ public class MobUtils { * @param path The path where the mob file is saved. * @param targetPath The directory path where the source file is renamed to. * @param cacheConfig The current cache config. + * @return The target file path the source file is renamed to. * @throws IOException */ - public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile, + public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile, Path targetPath, CacheConfig cacheConfig) throws IOException { if (sourceFile == null) { - return; + return null; } Path dstPath = new Path(targetPath, sourceFile.getName()); validateMobFile(conf, fs, sourceFile, cacheConfig); @@ -474,6 +569,7 @@ public class MobUtils { if (!fs.rename(sourceFile, dstPath)) { throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); } + return dstPath; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java new file mode 100644 index 0000000..375ba8c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * The compaction request for mob files. + */ [email protected] +public abstract class MobFileCompactionRequest { + + protected long selectionTime; + protected CompactionType type = CompactionType.PART_FILES; + + public void setCompactionType(CompactionType type) { + this.type = type; + } + + /** + * Gets the selection time. + * @return The selection time. + */ + public long getSelectionTime() { + return this.selectionTime; + } + + /** + * Gets the compaction type. + * @return The compaction type. + */ + public CompactionType getCompactionType() { + return type; + } + + protected enum CompactionType { + + /** + * Part of mob files are selected. + */ + PART_FILES, + + /** + * All of mob files are selected. + */ + ALL_FILES; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java new file mode 100644 index 0000000..bbc358e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * A mob file compactor to directly compact the mob files. + */ [email protected] +public abstract class MobFileCompactor { + + protected FileSystem fs; + protected Configuration conf; + protected TableName tableName; + protected HColumnDescriptor column; + + protected Path mobTableDir; + protected Path mobFamilyDir; + protected ExecutorService pool; + + public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + this.conf = conf; + this.fs = fs; + this.tableName = tableName; + this.column = column; + this.pool = pool; + mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString()); + } + + /** + * Compacts the mob files for the current column family. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public List<Path> compact() throws IOException { + return compact(Arrays.asList(fs.listStatus(mobFamilyDir))); + } + + /** + * Compacts the candidate mob files. + * @param files The candidate mob files. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public abstract List<Path> compact(List<FileStatus> files) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java new file mode 100644 index 0000000..d2ac1db --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java @@ -0,0 +1,146 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * An implementation of {@link MobFileCompactionRequest} that is used in + * {@link PartitionedMobFileCompactor}. + * The mob files that have the same start key and date in their names belong to + * the same partition. + */ [email protected] +public class PartitionedMobFileCompactionRequest extends MobFileCompactionRequest { + + protected Collection<FileStatus> delFiles; + protected Collection<CompactionPartition> compactionPartitions; + + public PartitionedMobFileCompactionRequest(Collection<CompactionPartition> compactionPartitions, + Collection<FileStatus> delFiles) { + this.selectionTime = EnvironmentEdgeManager.currentTime(); + this.compactionPartitions = compactionPartitions; + this.delFiles = delFiles; + } + + /** + * Gets the compaction partitions. + * @return The compaction partitions. + */ + public Collection<CompactionPartition> getCompactionPartitions() { + return this.compactionPartitions; + } + + /** + * Gets the del files. + * @return The del files. + */ + public Collection<FileStatus> getDelFiles() { + return this.delFiles; + } + + /** + * The partition in the mob file compaction. + * The mob files that have the same start key and date in their names belong to + * the same partition. + */ + protected static class CompactionPartition { + private List<FileStatus> files = new ArrayList<FileStatus>(); + private CompactionPartitionId partitionId; + + public CompactionPartition(CompactionPartitionId partitionId) { + this.partitionId = partitionId; + } + + public CompactionPartitionId getPartitionId() { + return this.partitionId; + } + + public void addFile(FileStatus file) { + files.add(file); + } + + public List<FileStatus> listFiles() { + return Collections.unmodifiableList(files); + } + } + + /** + * The partition id that consists of start key and date of the mob file name. + */ + protected static class CompactionPartitionId { + + private String startKey; + private String date; + + public CompactionPartitionId(String startKey, String date) { + if (startKey == null || date == null) { + throw new IllegalArgumentException("Neither of start key and date could be null"); + } + this.startKey = startKey; + this.date = date; + } + + public String getStartKey() { + return this.startKey; + } + + public String getDate() { + return this.date; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + startKey.hashCode(); + result = 31 * result + date.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CompactionPartitionId)) { + return false; + } + CompactionPartitionId another = (CompactionPartitionId) obj; + if (!this.startKey.equals(another.startKey)) { + return false; + } + if (!this.date.equals(another.date)) { + return false; + } + return true; + } + + @Override + public String toString() { + return new StringBuilder(startKey).append(date).toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java new file mode 100644 index 0000000..6cd3172 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java @@ -0,0 +1,631 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.filecompactions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions. + */ [email protected] +public class PartitionedMobFileCompactor extends MobFileCompactor { + + private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class); + protected long mergeableSize; + protected int delFileMaxCount; + /** The number of files compacted in a batch */ + protected int compactionBatchSize; + protected int compactionKVMax; + + private Path tempPath; + private Path bulkloadPath; + private CacheConfig compactionCacheConfig; + private Tag tableNameTag; + + public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + super(conf, fs, tableName, column, pool); + mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + // default is 100 + compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE); + tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, + tableName.getNameAsString())); + compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + compactionCacheConfig = new CacheConfig(copyOfConf); + tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); + } + + @Override + public List<Path> compact(List<FileStatus> files) throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + // find the files to compact. + PartitionedMobFileCompactionRequest request = select(files); + // compact the files. + return performCompaction(request); + } + + /** + * Selects the compacted mob/del files. + * Iterates the candidates to find out all the del files and small mob files. + * @param candidates All the candidates. + * @return A compaction request. + * @throws IOException + */ + protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates) + throws IOException { + Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>(); + Map<CompactionPartitionId, CompactionPartition> filesToCompact = + new HashMap<CompactionPartitionId, CompactionPartition>(); + int selectedFileCount = 0; + int irrelevantFileCount = 0; + for (FileStatus file : candidates) { + if (!file.isFile()) { + irrelevantFileCount++; + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { + HFileLink link = new HFileLink(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + // If the linked file cannot be found, regard it as an irrelevantFileCount file + irrelevantFileCount++; + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + allDelFiles.add(file); + } else if (linkedFile.getLen() < mergeableSize) { + // add the small files to the merge pool + MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); + CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), + fileName.getDate()); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + compactionPartition = new CompactionPartition(id); + compactionPartition.addFile(file); + filesToCompact.put(id, compactionPartition); + } else { + compactionPartition.addFile(file); + } + selectedFileCount++; + } + } + PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest( + filesToCompact.values(), allDelFiles); + if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + // all the files are selected + request.setCompactionType(CompactionType.ALL_FILES); + } + return request; + } + + /** + * Performs the compaction on the selected files. + * <ol> + * <li>Compacts the del files.</li> + * <li>Compacts the selected small mob files and all the del files.</li> + * <li>If all the candidates are selected, delete the del files.</li> + * </ol> + * @param request The compaction request. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request) + throws IOException { + // merge the del files + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + newDelFiles.add(sf); + } + // compact the mob files by partitions. + List<Path> paths = compactMobFiles(request, newDelFiles); + // archive the del files if all the mob files are selected. + if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + newDelFiles, e); + } + } + return paths; + } + + /** + * Compacts the selected small mob files and all the del files. + * @param request The compaction request. + * @param delFiles The del files. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request, + final List<StoreFile> delFiles) throws IOException { + Collection<CompactionPartition> partitions = request.compactionPartitions; + if (partitions == null || partitions.isEmpty()) { + return Collections.emptyList(); + } + List<Path> paths = new ArrayList<Path>(); + final HTable table = new HTable(conf, tableName); + try { + Map<CompactionPartitionId, Future<List<Path>>> results = + new HashMap<CompactionPartitionId, Future<List<Path>>>(); + // compact the mob files by partitions in parallel. + for (final CompactionPartition partition : partitions) { + results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { + @Override + public List<Path> call() throws Exception { + return compactMobFilePartition(request, partition, delFiles, table); + } + })); + } + // compact the partitions in parallel. + boolean hasFailure = false; + for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { + try { + paths.addAll(result.getValue().get()); + } catch (Exception e) { + // just log the error + LOG.error("Failed to compact the partition " + result.getKey(), e); + hasFailure = true; + } + } + if (hasFailure) { + // if any partition fails in the compaction, directly throw an exception. + throw new IOException("Failed to compact the partitions"); + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close the HTable", e); + } + } + return paths; + } + + /** + * Compacts a partition of selected small mob files and all the del files. + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. + * @param table The current table. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request, + CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException { + List<Path> newFiles = new ArrayList<Path>(); + List<FileStatus> files = partition.listFiles(); + int offset = 0; + Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); + Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); + while (offset < files.size()) { + int batch = compactionBatchSize; + if (files.size() - offset < compactionBatchSize) { + batch = files.size() - offset; + } + if (batch == 1 && delFiles.isEmpty()) { + // only one file left and no del files, do not compact it, + // and directly add it to the new files. + newFiles.add(files.get(offset).getPath()); + offset++; + continue; + } + // clean the bulkload directory to avoid loading old files. + fs.delete(bulkloadPathOfPartition, true); + // add the selected mob files and del files into filesToCompact + List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); + for (int i = offset; i < batch + offset; i++) { + StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, + BloomType.NONE); + filesToCompact.add(sf); + } + filesToCompact.addAll(delFiles); + // compact the mob files in a batch. + compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + // move to the next batch. + offset += batch; + } + return newFiles; + } + + /** + * Compacts a partition of selected small mob files and all the del files in a batch. + * @param request The compaction request. + * @param partition A compaction partition. + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. + * @param bulkloadPathOfPartition The directory where the bulkload column of the current + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. + * @throws IOException + */ + private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request, + CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles) + throws IOException { + // open scanner to the selected mob files and del files. + StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); + // the mob files to be compacted, not include the del files. + List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); + // Pair(maxSeqId, cellsCount) + Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); + // open writers for the mob files and new ref store files. + Writer writer = null; + Writer refFileWriter = null; + Path filePath = null; + Path refFilePath = null; + long mobCells = 0; + try { + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig); + refFilePath = refFileWriter.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + do { + hasMore = scanner.next(cells, compactionKVMax); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + // write the mob cell to the mob file. + writer.append(kv); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } + } + + /** + * Compacts the del files in batches which avoids opening too many files. + * @param request The compaction request. + * @param delFilePaths + * @return The paths of new del files after merging or the original files if no merging + * is necessary. + * @throws IOException + */ + protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request, + List<Path> delFilePaths) throws IOException { + if (delFilePaths.size() <= delFileMaxCount) { + return delFilePaths; + } + // when there are more del files than the number that is allowed, merge it firstly. + int offset = 0; + List<Path> paths = new ArrayList<Path>(); + while (offset < delFilePaths.size()) { + // get the batch + int batch = compactionBatchSize; + if (delFilePaths.size() - offset < compactionBatchSize) { + batch = delFilePaths.size() - offset; + } + List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>(); + if (batch == 1) { + // only one file left, do not compact it, directly add it to the new files. + paths.add(delFilePaths.get(offset)); + offset++; + continue; + } + for (int i = offset; i < batch + offset; i++) { + batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, + BloomType.NONE)); + } + // compact the del files in a batch. + paths.add(compactDelFilesInBatch(request, batchedDelFiles)); + // move to the next batch. + offset += batch; + } + return compactDelFiles(request, paths); + } + + /** + * Compacts the del file in a batch. + * @param request The compaction request. + * @param delFiles The del files. + * @return The path of new del file after merging. + * @throws IOException + */ + private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request, + List<StoreFile> delFiles) throws IOException { + // create a scanner for the del files. + StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); + Writer writer = null; + Path filePath = null; + try { + writer = MobUtils.createDelFileWriter(conf, fs, column, + MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, + column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig); + filePath = writer.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + do { + hasMore = scanner.next(cells, compactionKVMax); + for (Cell cell : cells) { + // TODO remove this after the new code are introduced. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + writer.append(kv); + } + cells.clear(); + } while (hasMore); + } finally { + scanner.close(); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + filePath, e); + } + } + } + // commit the new del file + Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // archive the old del files + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); + } catch (IOException e) { + LOG.error("Failed to archive the old del files " + delFiles, e); + } + return path; + } + + /** + * Creates a store scanner. + * @param filesToCompact The files to be compacted. + * @param scanType The scan type. + * @return The store scanner. + * @throws IOException + */ + private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) + throws IOException { + List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, + null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(column.getMaxVersions()); + long ttl = HStore.determineTTLFromFamily(column); + ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, + HConstants.LATEST_TIMESTAMP); + return scanner; + } + + /** + * Bulkloads the current file. + * @param table The current table. + * @param bulkloadDirectory The path of bulkload directory. + * @param fileName The current file name. + * @throws IOException + */ + private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName) + throws IOException { + // bulkload the ref file + try { + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(bulkloadDirectory, table); + } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); + throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); + } + } + + /** + * Closes the mob file writer. + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. + * @param mobCellsCount The number of mob cells. + * @throws IOException + */ + private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false, mobCellsCount); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + writer.getPath(), e); + } + } + } + + /** + * Closes the ref file writer. + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. + * @param bulkloadTime The timestamp at which the bulk load file is created. + * @throws IOException + */ + private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false); + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); + } + } + } + + /** + * Gets the max seqId and number of cells of the store files. + * @param storeFiles The store files. + * @return The pair of the max seqId and number of cells of the store files. + * @throws IOException + */ + private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException { + long maxSeqId = 0; + long maxKeyCount = 0; + for (StoreFile sf : storeFiles) { + // the readers will be closed later after the merge. + maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); + byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + if (count != null) { + maxKeyCount += Bytes.toLong(count); + } + } + return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + } + + /** + * Deletes a file. + * @param path The path of the file to be deleted. + */ + private void deletePath(Path path) { + try { + if (path != null) { + fs.delete(path, true); + } + } catch (IOException e) { + LOG.error("Failed to delete the file " + path, e); + } + } + + private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { + Path[] locations = link.getLocations(); + for (Path location : locations) { + FileStatus file = getFileStatus(location); + if (file != null) { + return file; + } + } + return null; + } + + private FileStatus getFileStatus(Path path) throws IOException { + try { + if (path != null) { + FileStatus file = fs.getFileStatus(path); + return file; + } + } catch (FileNotFoundException e) { + LOG.warn("The file " + path + " can not be found", e); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ad7318b..b9f4038 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -363,7 +363,7 @@ public class HStore implements Store { * @param family * @return TTL in seconds of the specified family */ - static long determineTTLFromFamily(final HColumnDescriptor family) { + public static long determineTTLFromFamily(final HColumnDescriptor family) { // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. long ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 5519b4b..930e1d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -272,8 +272,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // 0 is passed as readpoint because the test bypasses Store 0); } - - StoreScanner(final Scan scan, ScanInfo scanInfo, + + public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs, long readPt) throws IOException {
