http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index 464a0e7..dd33cda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@ -77,43 +77,43 @@ public class MobConstants { public final static String EMPTY_STRING = ""; /** * If the size of a mob file is less than this value, it's regarded as a small file and needs to - * be merged in mob file compaction. The default value is 192MB. + * be merged in mob compaction. The default value is 192MB. */ - public static final String MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = - "hbase.mob.file.compaction.mergeable.threshold"; - public static final long DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024; + public static final String MOB_COMPACTION_MERGEABLE_THRESHOLD = + "hbase.mob.compaction.mergeable.threshold"; + public static final long DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024; /** - * The max number of del files that is allowed in the mob file compaction. In the mob file + * The max number of del files that is allowed in the mob file compaction. In the mob * compaction, when the number of existing del files is larger than this value, they are merged * until number of del files is not larger this value. The default value is 3. */ public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count"; public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3; /** - * The max number of the mob files that is allowed in a batch of the mob file compaction. - * The mob file compaction merges the small mob files to bigger ones. If the number of the + * The max number of the mob files that is allowed in a batch of the mob compaction. + * The mob compaction merges the small mob files to bigger ones. If the number of the * small files is very large, it could lead to a "too many opened file handlers" in the merge. * And the merge has to be split into batches. This value limits the number of mob files - * that are selected in a batch of the mob file compaction. The default value is 100. + * that are selected in a batch of the mob compaction. The default value is 100. */ - public static final String MOB_FILE_COMPACTION_BATCH_SIZE = - "hbase.mob.file.compaction.batch.size"; - public static final int DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE = 100; + public static final String MOB_COMPACTION_BATCH_SIZE = + "hbase.mob.compaction.batch.size"; + public static final int DEFAULT_MOB_COMPACTION_BATCH_SIZE = 100; /** - * The period that MobFileCompactionChore runs. The unit is millisecond. + * The period that MobCompactionChore runs. The unit is second. * The default value is one week. */ - public static final String MOB_FILE_COMPACTION_CHORE_PERIOD = - "hbase.mob.file.compaction.chore.period"; - public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD = + public static final String MOB_COMPACTION_CHORE_PERIOD = + "hbase.mob.compaction.chore.period"; + public static final int DEFAULT_MOB_COMPACTION_CHORE_PERIOD = 24 * 60 * 60 * 7; // a week - public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class"; + public static final String MOB_COMPACTOR_CLASS_KEY = "hbase.mob.compactor.class"; /** - * The max number of threads used in MobFileCompactor. + * The max number of threads used in MobCompactor. */ - public static final String MOB_FILE_COMPACTION_THREADS_MAX = - "hbase.mob.file.compaction.threads.max"; - public static final int DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX = 1; + public static final String MOB_COMPACTION_THREADS_MAX = + "hbase.mob.compaction.threads.max"; + public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1; private MobConstants() { }
http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java index 7d8c9a5..0780f87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@ -177,7 +177,7 @@ public class MobFileCache { evictedFileCount.incrementAndGet(); } } catch (IOException e) { - LOG.error("Fail to evict the file " + fileName, e); + LOG.error("Failed to evict the file " + fileName, e); } finally { if (lockEntry != null) { keyLock.releaseLockEntry(lockEntry); @@ -249,7 +249,7 @@ public class MobFileCache { public void shutdown() { this.scheduleThreadPool.shutdown(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { if (!this.scheduleThreadPool.isShutdown()) { try { Thread.sleep(10); http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java index 937e965..796fe4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java @@ -26,9 +26,9 @@ import org.apache.hadoop.hbase.util.MD5Hash; * It consists of a md5 of a start key, a date and an uuid. * It looks like md5(start) + date + uuid. * <ol> - * <li>0-31 characters: md5 hex string of a start key. Since the length of the start key is not + * <li>characters 0-31: md5 hex string of a start key. Since the length of the start key is not * fixed, have to use the md5 instead which has a fix length.</li> - * <li>32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp + * <li>characters 32-39: a string of a date with format yyyymmdd. The date is the latest timestamp * of cells in this file</li> * <li>the remaining characters: the uuid.</li> * </ol> http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java index 2d5f1ad..a54660c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java @@ -43,6 +43,6 @@ public class MobStoreEngine extends DefaultStoreEngine { */ @Override protected void createCompactor(Configuration conf, Store store) throws IOException { - compactor = new DefaultMobCompactor(conf, store); + compactor = new DefaultMobStoreCompactor(conf, store); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index bbdc47a..c40767c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; @@ -63,8 +64,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor; -import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor; +import org.apache.hadoop.hbase.mob.compactions.MobCompactor; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -249,7 +250,7 @@ public class MobUtils { try { stats = fs.listStatus(path); } catch (FileNotFoundException e) { - LOG.warn("Fail to find the mob file " + path, e); + LOG.warn("Failed to find the mob file " + path, e); } if (null == stats) { // no file found @@ -287,7 +288,7 @@ public class MobUtils { filesToClean); deletedFileCount = filesToClean.size(); } catch (IOException e) { - LOG.error("Fail to delete the mob files " + filesToClean, e); + LOG.error("Failed to delete the mob files " + filesToClean, e); } } LOG.info(deletedFileCount + " expired mob files are deleted"); @@ -555,7 +556,7 @@ public class MobUtils { } /** - * Creates a writer for the del file in temp directory. + * Creates a writer for the mob file in temp directory. * @param conf The current configuration. * @param fs The current file system. * @param family The descriptor of the current column family. @@ -631,7 +632,7 @@ public class MobUtils { storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE); storeFile.createReader(); } catch (IOException e) { - LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); + LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e); throw e; } finally { if (storeFile != null) { @@ -692,22 +693,22 @@ public class MobUtils { } /** - * Performs the mob file compaction. + * Performs the mob compaction. * @param conf the Configuration * @param fs the file system * @param tableName the table the compact * @param hcd the column descriptor * @param pool the thread pool * @param tableLockManager the tableLock manager - * @param isForceAllFiles Whether add all mob files into the compaction. + * @param allFiles Whether add all mob files into the compaction. */ - public static void doMobFileCompaction(Configuration conf, FileSystem fs, TableName tableName, + public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName, HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager, - boolean isForceAllFiles) throws IOException { - String className = conf.get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY, - PartitionedMobFileCompactor.class.getName()); - // instantiate the mob file compactor. - MobFileCompactor compactor = null; + boolean allFiles) throws IOException { + String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, + PartitionedMobCompactor.class.getName()); + // instantiate the mob compactor. + MobCompactor compactor = null; try { compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class, @@ -724,21 +725,21 @@ public class MobUtils { // the tableLockManager might be null in testing. In that case, it is lock-free. if (tableLockManager != null) { lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName), - "Run MobFileCompaction"); + "Run MobCompactor"); lock.acquire(); } tableLocked = true; - compactor.compact(isForceAllFiles); + compactor.compact(allFiles); } catch (Exception e) { - LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString() + LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString() + " in the table " + tableName.getNameAsString(), e); } finally { if (lock != null && tableLocked) { try { lock.release(); } catch (IOException e) { - LOG.error("Fail to release the write lock for the table " + tableName.getNameAsString(), - e); + LOG.error( + "Failed to release the write lock for the table " + tableName.getNameAsString(), e); } } } @@ -749,15 +750,15 @@ public class MobUtils { * @param conf the Configuration * @return A thread pool. */ - public static ExecutorService createMobFileCompactorThreadPool(Configuration conf) { - int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_THREADS_MAX, - MobConstants.DEFAULT_MOB_FILE_COMPACTION_THREADS_MAX); + public static ExecutorService createMobCompactorThreadPool(Configuration conf) { + int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX, + MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX); if (maxThreads == 0) { maxThreads = 1; } final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue, - Threads.newDaemonThreadFactory("MobFileCompactor"), new RejectedExecutionHandler() { + Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { @@ -839,4 +840,19 @@ public class MobUtils { } return cryptoContext; } + + /** + * Checks whether this table has mob-enabled columns. + * @param htd The current table descriptor. + * @return Whether this table has mob-enabled columns. + */ + public static boolean hasMobColumns(HTableDescriptor htd) { + HColumnDescriptor[] hcds = htd.getColumnFamilies(); + for (HColumnDescriptor hcd : hcds) { + if (hcd.isMobEnabled()) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java new file mode 100644 index 0000000..5d162b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * The compaction request for mob files. + */ [email protected] +public abstract class MobCompactionRequest { + + protected long selectionTime; + protected CompactionType type = CompactionType.PART_FILES; + + public void setCompactionType(CompactionType type) { + this.type = type; + } + + /** + * Gets the selection time. + * @return The selection time. + */ + public long getSelectionTime() { + return this.selectionTime; + } + + /** + * Gets the compaction type. + * @return The compaction type. + */ + public CompactionType getCompactionType() { + return type; + } + + protected enum CompactionType { + + /** + * Part of mob files are selected. + */ + PART_FILES, + + /** + * All of mob files are selected. + */ + ALL_FILES; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java new file mode 100644 index 0000000..156c6f6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java @@ -0,0 +1,90 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * A mob compactor to directly compact the mob files. + */ [email protected] +public abstract class MobCompactor { + + protected FileSystem fs; + protected Configuration conf; + protected TableName tableName; + protected HColumnDescriptor column; + + protected Path mobTableDir; + protected Path mobFamilyDir; + protected ExecutorService pool; + + public MobCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) { + this.conf = conf; + this.fs = fs; + this.tableName = tableName; + this.column = column; + this.pool = pool; + mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); + mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString()); + } + + /** + * Compacts the mob files for the current column family. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public List<Path> compact() throws IOException { + return compact(false); + } + + /** + * Compacts the mob files by compaction type for the current column family. + * @param allFiles Whether add all mob files into the compaction. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public List<Path> compact(boolean allFiles) throws IOException { + return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), allFiles); + } + + /** + * Compacts the candidate mob files. + * @param files The candidate mob files. + * @param allFiles Whether add all mob files into the compaction. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + public abstract List<Path> compact(List<FileStatus> files, boolean allFiles) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java new file mode 100644 index 0000000..af1eb4a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java @@ -0,0 +1,146 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * An implementation of {@link MobCompactionRequest} that is used in + * {@link PartitionedMobCompactor}. + * The mob files that have the same start key and date in their names belong to + * the same partition. + */ [email protected] +public class PartitionedMobCompactionRequest extends MobCompactionRequest { + + protected Collection<FileStatus> delFiles; + protected Collection<CompactionPartition> compactionPartitions; + + public PartitionedMobCompactionRequest(Collection<CompactionPartition> compactionPartitions, + Collection<FileStatus> delFiles) { + this.selectionTime = EnvironmentEdgeManager.currentTime(); + this.compactionPartitions = compactionPartitions; + this.delFiles = delFiles; + } + + /** + * Gets the compaction partitions. + * @return The compaction partitions. + */ + public Collection<CompactionPartition> getCompactionPartitions() { + return this.compactionPartitions; + } + + /** + * Gets the del files. + * @return The del files. + */ + public Collection<FileStatus> getDelFiles() { + return this.delFiles; + } + + /** + * The partition in the mob compaction. + * The mob files that have the same start key and date in their names belong to + * the same partition. + */ + protected static class CompactionPartition { + private List<FileStatus> files = new ArrayList<FileStatus>(); + private CompactionPartitionId partitionId; + + public CompactionPartition(CompactionPartitionId partitionId) { + this.partitionId = partitionId; + } + + public CompactionPartitionId getPartitionId() { + return this.partitionId; + } + + public void addFile(FileStatus file) { + files.add(file); + } + + public List<FileStatus> listFiles() { + return Collections.unmodifiableList(files); + } + } + + /** + * The partition id that consists of start key and date of the mob file name. + */ + public static class CompactionPartitionId { + + private String startKey; + private String date; + + public CompactionPartitionId(String startKey, String date) { + if (startKey == null || date == null) { + throw new IllegalArgumentException("Neither of start key and date could be null"); + } + this.startKey = startKey; + this.date = date; + } + + public String getStartKey() { + return this.startKey; + } + + public String getDate() { + return this.date; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + startKey.hashCode(); + result = 31 * result + date.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CompactionPartitionId)) { + return false; + } + CompactionPartitionId another = (CompactionPartitionId) obj; + if (!this.startKey.equals(another.startKey)) { + return false; + } + if (!this.date.equals(another.date)) { + return false; + } + return true; + } + + @Override + public String toString() { + return new StringBuilder(startKey).append(date).toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java new file mode 100644 index 0000000..065787e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -0,0 +1,655 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob.compactions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An implementation of {@link MobCompactor} that compacts the mob files in partitions. + */ [email protected] +public class PartitionedMobCompactor extends MobCompactor { + + private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class); + protected long mergeableSize; + protected int delFileMaxCount; + /** The number of files compacted in a batch */ + protected int compactionBatchSize; + protected int compactionKVMax; + + private Path tempPath; + private Path bulkloadPath; + private CacheConfig compactionCacheConfig; + private Tag tableNameTag; + private Encryption.Context cryptoContext = Encryption.Context.NONE; + + public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, + HColumnDescriptor column, ExecutorService pool) throws IOException { + super(conf, fs, tableName, column, pool); + mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, + MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + // default is 100 + compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); + tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( + tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); + compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + compactionCacheConfig = new CacheConfig(copyOfConf); + tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName()); + cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column); + } + + @Override + public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException { + if (files == null || files.isEmpty()) { + LOG.info("No candidate mob files"); + return null; + } + LOG.info("is allFiles: " + allFiles); + // find the files to compact. + PartitionedMobCompactionRequest request = select(files, allFiles); + // compact the files. + return performCompaction(request); + } + + /** + * Selects the compacted mob/del files. + * Iterates the candidates to find out all the del files and small mob files. + * @param candidates All the candidates. + * @param allFiles Whether add all mob files into the compaction. + * @return A compaction request. + * @throws IOException + */ + protected PartitionedMobCompactionRequest select(List<FileStatus> candidates, + boolean allFiles) throws IOException { + Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>(); + Map<CompactionPartitionId, CompactionPartition> filesToCompact = + new HashMap<CompactionPartitionId, CompactionPartition>(); + int selectedFileCount = 0; + int irrelevantFileCount = 0; + for (FileStatus file : candidates) { + if (!file.isFile()) { + irrelevantFileCount++; + continue; + } + // group the del files and small files. + FileStatus linkedFile = file; + if (HFileLink.isHFileLink(file.getPath())) { + HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); + linkedFile = getLinkedFileStatus(link); + if (linkedFile == null) { + // If the linked file cannot be found, regard it as an irrelevantFileCount file + irrelevantFileCount++; + continue; + } + } + if (StoreFileInfo.isDelFile(linkedFile.getPath())) { + allDelFiles.add(file); + } else if (allFiles || linkedFile.getLen() < mergeableSize) { + // add all files if allFiles is true, + // otherwise add the small files to the merge pool + MobFileName fileName = MobFileName.create(linkedFile.getPath().getName()); + CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(), + fileName.getDate()); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + compactionPartition = new CompactionPartition(id); + compactionPartition.addFile(file); + filesToCompact.put(id, compactionPartition); + } else { + compactionPartition.addFile(file); + } + selectedFileCount++; + } + } + PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( + filesToCompact.values(), allDelFiles); + if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) { + // all the files are selected + request.setCompactionType(CompactionType.ALL_FILES); + } + LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " + + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and " + + irrelevantFileCount + " irrelevant files"); + return request; + } + + /** + * Performs the compaction on the selected files. + * <ol> + * <li>Compacts the del files.</li> + * <li>Compacts the selected small mob files and all the del files.</li> + * <li>If all the candidates are selected, delete the del files.</li> + * </ol> + * @param request The compaction request. + * @return The paths of new mob files generated in the compaction. + * @throws IOException + */ + protected List<Path> performCompaction(PartitionedMobCompactionRequest request) + throws IOException { + // merge the del files + List<Path> delFilePaths = new ArrayList<Path>(); + for (FileStatus delFile : request.delFiles) { + delFilePaths.add(delFile.getPath()); + } + List<Path> newDelPaths = compactDelFiles(request, delFilePaths); + List<StoreFile> newDelFiles = new ArrayList<StoreFile>(); + for (Path newDelPath : newDelPaths) { + StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + newDelFiles.add(sf); + } + LOG.info("After merging, there are " + newDelFiles.size() + " del files"); + // compact the mob files by partitions. + List<Path> paths = compactMobFiles(request, newDelFiles); + LOG.info("After compaction, there are " + paths.size() + " mob files"); + // archive the del files if all the mob files are selected. + if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { + LOG.info("After a mob compaction with all files selected, archiving the del files " + + newDelPaths); + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); + } catch (IOException e) { + LOG.error("Failed to archive the del files " + newDelPaths, e); + } + } + return paths; + } + + /** + * Compacts the selected small mob files and all the del files. + * @param request The compaction request. + * @param delFiles The del files. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request, + final List<StoreFile> delFiles) throws IOException { + Collection<CompactionPartition> partitions = request.compactionPartitions; + if (partitions == null || partitions.isEmpty()) { + LOG.info("No partitions of mob files"); + return Collections.emptyList(); + } + List<Path> paths = new ArrayList<Path>(); + Connection c = ConnectionFactory.createConnection(conf); + final Table table = c.getTable(tableName); + try { + Map<CompactionPartitionId, Future<List<Path>>> results = + new HashMap<CompactionPartitionId, Future<List<Path>>>(); + // compact the mob files by partitions in parallel. + for (final CompactionPartition partition : partitions) { + results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() { + @Override + public List<Path> call() throws Exception { + LOG.info("Compacting mob files for partition " + partition.getPartitionId()); + return compactMobFilePartition(request, partition, delFiles, table); + } + })); + } + // compact the partitions in parallel. + List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>(); + for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) { + try { + paths.addAll(result.getValue().get()); + } catch (Exception e) { + // just log the error + LOG.error("Failed to compact the partition " + result.getKey(), e); + failedPartitions.add(result.getKey()); + } + } + if (!failedPartitions.isEmpty()) { + // if any partition fails in the compaction, directly throw an exception. + throw new IOException("Failed to compact the partitions " + failedPartitions); + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close the HTable", e); + } + } + return paths; + } + + /** + * Compacts a partition of selected small mob files and all the del files. + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. + * @param table The current table. + * @return The paths of new mob files after compactions. + * @throws IOException + */ + private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request, + CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException { + List<Path> newFiles = new ArrayList<Path>(); + List<FileStatus> files = partition.listFiles(); + int offset = 0; + Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); + Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString()); + while (offset < files.size()) { + int batch = compactionBatchSize; + if (files.size() - offset < compactionBatchSize) { + batch = files.size() - offset; + } + if (batch == 1 && delFiles.isEmpty()) { + // only one file left and no del files, do not compact it, + // and directly add it to the new files. + newFiles.add(files.get(offset).getPath()); + offset++; + continue; + } + // clean the bulkload directory to avoid loading old files. + fs.delete(bulkloadPathOfPartition, true); + // add the selected mob files and del files into filesToCompact + List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); + for (int i = offset; i < batch + offset; i++) { + StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, + BloomType.NONE); + filesToCompact.add(sf); + } + filesToCompact.addAll(delFiles); + // compact the mob files in a batch. + compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + // move to the next batch. + offset += batch; + } + LOG.info("Compaction is finished. The number of mob files is changed from " + files.size() + + " to " + newFiles.size()); + return newFiles; + } + + /** + * Compacts a partition of selected small mob files and all the del files in a batch. + * @param request The compaction request. + * @param partition A compaction partition. + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. + * @param bulkloadPathOfPartition The directory where the bulkload column of the current + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. + * @throws IOException + */ + private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, + CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles) + throws IOException { + // open scanner to the selected mob files and del files. + StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); + // the mob files to be compacted, not include the del files. + List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch); + // Pair(maxSeqId, cellsCount) + Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact); + // open writers for the mob files and new ref store files. + Writer writer = null; + Writer refFileWriter = null; + Path filePath = null; + Path refFilePath = null; + long mobCells = 0; + try { + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig, cryptoContext); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig, cryptoContext); + refFilePath = refFileWriter.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // write the mob cell to the mob file. + writer.append(cell); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } + } + + /** + * Compacts the del files in batches which avoids opening too many files. + * @param request The compaction request. + * @param delFilePaths + * @return The paths of new del files after merging or the original files if no merging + * is necessary. + * @throws IOException + */ + protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request, + List<Path> delFilePaths) throws IOException { + if (delFilePaths.size() <= delFileMaxCount) { + return delFilePaths; + } + // when there are more del files than the number that is allowed, merge it firstly. + int offset = 0; + List<Path> paths = new ArrayList<Path>(); + while (offset < delFilePaths.size()) { + // get the batch + int batch = compactionBatchSize; + if (delFilePaths.size() - offset < compactionBatchSize) { + batch = delFilePaths.size() - offset; + } + List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>(); + if (batch == 1) { + // only one file left, do not compact it, directly add it to the new files. + paths.add(delFilePaths.get(offset)); + offset++; + continue; + } + for (int i = offset; i < batch + offset; i++) { + batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, + BloomType.NONE)); + } + // compact the del files in a batch. + paths.add(compactDelFilesInBatch(request, batchedDelFiles)); + // move to the next batch. + offset += batch; + } + return compactDelFiles(request, paths); + } + + /** + * Compacts the del file in a batch. + * @param request The compaction request. + * @param delFiles The del files. + * @return The path of new del file after merging. + * @throws IOException + */ + private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, + List<StoreFile> delFiles) throws IOException { + // create a scanner for the del files. + StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); + Writer writer = null; + Path filePath = null; + try { + writer = MobUtils.createDelFileWriter(conf, fs, column, + MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, + column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig, + cryptoContext); + filePath = writer.getPath(); + List<Cell> cells = new ArrayList<Cell>(); + boolean hasMore = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + writer.append(cell); + } + cells.clear(); + } while (hasMore); + } finally { + scanner.close(); + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + filePath, e); + } + } + } + // commit the new del file + Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // archive the old del files + try { + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles); + } catch (IOException e) { + LOG.error("Failed to archive the old del files " + delFiles, e); + } + return path; + } + + /** + * Creates a store scanner. + * @param filesToCompact The files to be compacted. + * @param scanType The scan type. + * @return The store scanner. + * @throws IOException + */ + private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) + throws IOException { + List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, + null, HConstants.LATEST_TIMESTAMP); + Scan scan = new Scan(); + scan.setMaxVersions(column.getMaxVersions()); + long ttl = HStore.determineTTLFromFamily(column); + ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR); + StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, + HConstants.LATEST_TIMESTAMP); + return scanner; + } + + /** + * Bulkloads the current file. + * @param table The current table. + * @param bulkloadDirectory The path of bulkload directory. + * @param fileName The current file name. + * @throws IOException + */ + private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName) + throws IOException { + // bulkload the ref file + try { + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(bulkloadDirectory, (HTable)table); + } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); + throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); + } + } + + /** + * Closes the mob file writer. + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. + * @param mobCellsCount The number of mob cells. + * @throws IOException + */ + private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false, mobCellsCount); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the file " + writer.getPath(), e); + } + } + } + + /** + * Closes the ref file writer. + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. + * @param bulkloadTime The timestamp at which the bulk load file is created. + * @throws IOException + */ + private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime) + throws IOException { + if (writer != null) { + writer.appendMetadata(maxSeqId, false); + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + try { + writer.close(); + } catch (IOException e) { + LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e); + } + } + } + + /** + * Gets the max seqId and number of cells of the store files. + * @param storeFiles The store files. + * @return The pair of the max seqId and number of cells of the store files. + * @throws IOException + */ + private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException { + long maxSeqId = 0; + long maxKeyCount = 0; + for (StoreFile sf : storeFiles) { + // the readers will be closed later after the merge. + maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); + byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + if (count != null) { + maxKeyCount += Bytes.toLong(count); + } + } + return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + } + + /** + * Deletes a file. + * @param path The path of the file to be deleted. + */ + private void deletePath(Path path) { + try { + if (path != null) { + fs.delete(path, true); + } + } catch (IOException e) { + LOG.error("Failed to delete the file " + path, e); + } + } + + private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { + Path[] locations = link.getLocations(); + for (Path location : locations) { + FileStatus file = getFileStatus(location); + if (file != null) { + return file; + } + } + return null; + } + + private FileStatus getFileStatus(Path path) throws IOException { + try { + if (path != null) { + FileStatus file = fs.getFileStatus(path); + return file; + } + } catch (FileNotFoundException e) { + LOG.warn("The file " + path + " can not be found", e); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java deleted file mode 100644 index 375ba8c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.filecompactions; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * The compaction request for mob files. - */ [email protected] -public abstract class MobFileCompactionRequest { - - protected long selectionTime; - protected CompactionType type = CompactionType.PART_FILES; - - public void setCompactionType(CompactionType type) { - this.type = type; - } - - /** - * Gets the selection time. - * @return The selection time. - */ - public long getSelectionTime() { - return this.selectionTime; - } - - /** - * Gets the compaction type. - * @return The compaction type. - */ - public CompactionType getCompactionType() { - return type; - } - - protected enum CompactionType { - - /** - * Part of mob files are selected. - */ - PART_FILES, - - /** - * All of mob files are selected. - */ - ALL_FILES; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java deleted file mode 100644 index fcb39c5..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.filecompactions; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.util.FSUtils; - -/** - * A mob file compactor to directly compact the mob files. - */ [email protected] -public abstract class MobFileCompactor { - - protected FileSystem fs; - protected Configuration conf; - protected TableName tableName; - protected HColumnDescriptor column; - - protected Path mobTableDir; - protected Path mobFamilyDir; - protected ExecutorService pool; - - public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName, - HColumnDescriptor column, ExecutorService pool) { - this.conf = conf; - this.fs = fs; - this.tableName = tableName; - this.column = column; - this.pool = pool; - mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); - mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString()); - } - - /** - * Compacts the mob files for the current column family. - * @return The paths of new mob files generated in the compaction. - * @throws IOException - */ - public List<Path> compact() throws IOException { - return compact(false); - } - - /** - * Compacts the mob files by compaction type for the current column family. - * @param isForceAllFiles Whether add all mob files into the compaction. - * @return The paths of new mob files generated in the compaction. - * @throws IOException - */ - public List<Path> compact(boolean isForceAllFiles) throws IOException { - return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), isForceAllFiles); - } - - /** - * Compacts the candidate mob files. - * @param files The candidate mob files. - * @param isForceAllFiles Whether add all mob files into the compaction. - * @return The paths of new mob files generated in the compaction. - * @throws IOException - */ - public abstract List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/b31a6acf/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java deleted file mode 100644 index d2ac1db..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.filecompactions; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -/** - * An implementation of {@link MobFileCompactionRequest} that is used in - * {@link PartitionedMobFileCompactor}. - * The mob files that have the same start key and date in their names belong to - * the same partition. - */ [email protected] -public class PartitionedMobFileCompactionRequest extends MobFileCompactionRequest { - - protected Collection<FileStatus> delFiles; - protected Collection<CompactionPartition> compactionPartitions; - - public PartitionedMobFileCompactionRequest(Collection<CompactionPartition> compactionPartitions, - Collection<FileStatus> delFiles) { - this.selectionTime = EnvironmentEdgeManager.currentTime(); - this.compactionPartitions = compactionPartitions; - this.delFiles = delFiles; - } - - /** - * Gets the compaction partitions. - * @return The compaction partitions. - */ - public Collection<CompactionPartition> getCompactionPartitions() { - return this.compactionPartitions; - } - - /** - * Gets the del files. - * @return The del files. - */ - public Collection<FileStatus> getDelFiles() { - return this.delFiles; - } - - /** - * The partition in the mob file compaction. - * The mob files that have the same start key and date in their names belong to - * the same partition. - */ - protected static class CompactionPartition { - private List<FileStatus> files = new ArrayList<FileStatus>(); - private CompactionPartitionId partitionId; - - public CompactionPartition(CompactionPartitionId partitionId) { - this.partitionId = partitionId; - } - - public CompactionPartitionId getPartitionId() { - return this.partitionId; - } - - public void addFile(FileStatus file) { - files.add(file); - } - - public List<FileStatus> listFiles() { - return Collections.unmodifiableList(files); - } - } - - /** - * The partition id that consists of start key and date of the mob file name. - */ - protected static class CompactionPartitionId { - - private String startKey; - private String date; - - public CompactionPartitionId(String startKey, String date) { - if (startKey == null || date == null) { - throw new IllegalArgumentException("Neither of start key and date could be null"); - } - this.startKey = startKey; - this.date = date; - } - - public String getStartKey() { - return this.startKey; - } - - public String getDate() { - return this.date; - } - - @Override - public int hashCode() { - int result = 17; - result = 31 * result + startKey.hashCode(); - result = 31 * result + date.hashCode(); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof CompactionPartitionId)) { - return false; - } - CompactionPartitionId another = (CompactionPartitionId) obj; - if (!this.startKey.equals(another.startKey)) { - return false; - } - if (!this.date.equals(another.date)) { - return false; - } - return true; - } - - @Override - public String toString() { - return new StringBuilder(startKey).append(date).toString(); - } - } -}
