This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 9e40fa13a7a HBASE-24687: Use existing HMaster Connection in
MobFileCleanerChore (#5509)
9e40fa13a7a is described below
commit 9e40fa13a7a5b2c89cb7054783b79b48416ca85a
Author: Charles Connell <[email protected]>
AuthorDate: Mon Nov 13 09:35:12 2023 -0500
HBASE-24687: Use existing HMaster Connection in MobFileCleanerChore (#5509)
Signed-off-by: Bryan Beaudreault <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
---
.../hadoop/hbase/IntegrationTestMobCompaction.java | 14 +-
.../hadoop/hbase/mob/MobFileCleanerChore.java | 226 +------------------
.../hadoop/hbase/mob/MobFileCleanupUtil.java | 250 +++++++++++++++++++++
.../hadoop/hbase/mob/MobStressToolRunner.java | 9 +-
.../hbase/mob/TestMobCompactionWithDefaults.java | 3 -
...eanerChore.java => TestMobFileCleanupUtil.java} | 14 +-
6 files changed, 269 insertions(+), 247 deletions(-)
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
index a39d71888a3..208f11807b0 100644
---
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
+++
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor;
import org.apache.hadoop.hbase.mob.MobConstants;
-import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
+import org.apache.hadoop.hbase.mob.MobFileCleanupUtil;
import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
@@ -97,7 +97,6 @@ public class IntegrationTestMobCompaction extends
IntegrationTestBase {
private static HColumnDescriptor hcd;
private static Admin admin;
private static Table table = null;
- private static MobFileCleanerChore chore;
private static volatile boolean run = true;
@@ -248,12 +247,9 @@ public class IntegrationTestMobCompaction extends
IntegrationTestBase {
public void run() {
while (run) {
try {
- LOG.info("MOB cleanup chore started ...");
- if (chore == null) {
- chore = new MobFileCleanerChore();
- }
- chore.cleanupObsoleteMobFiles(conf, table.getName());
- LOG.info("MOB cleanup chore finished");
+ LOG.info("MOB cleanup started ...");
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(),
admin);
+ LOG.info("MOB cleanup finished");
Thread.sleep(130000);
} catch (Exception e) {
@@ -328,7 +324,7 @@ public class IntegrationTestMobCompaction extends
IntegrationTestBase {
LOG.info("Waiting for write thread to finish ...");
writeData.join();
// Cleanup again
- chore.cleanupObsoleteMobFiles(conf, table.getName());
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
if (util != null) {
LOG.info("Archive cleaner started ...");
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index eb2201417b0..c4bada278df 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -17,43 +17,20 @@
*/
package org.apache.hadoop.hbase.mob;
-import com.google.errorprone.annotations.RestrictedApi;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStoreFile;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
-
/**
* The class MobFileCleanerChore for running cleaner regularly to remove the
expired and obsolete
* (files which have no active references to) mob files.
@@ -100,12 +77,6 @@ public class MobFileCleanerChore extends ScheduledChore {
}
}
- @RestrictedApi(explanation = "Should only be called in tests", link = "",
- allowedOnPath = ".*/src/test/.*")
- public MobFileCleanerChore() {
- this.master = null;
- }
-
@Override
protected void chore() {
TableDescriptors htds = master.getTableDescriptors();
@@ -131,7 +102,10 @@ public class MobFileCleanerChore extends ScheduledChore {
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
- cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName());
+ try (final Admin admin = master.getConnection().getAdmin()) {
+
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(),
htd.getTableName(),
+ admin);
+ }
LOG.info("Cleaning obsolete MOB files finished for table={}",
htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}",
htd.getTableName(), e);
@@ -139,196 +113,4 @@ public class MobFileCleanerChore extends ScheduledChore {
}
}
- /**
- * Performs housekeeping file cleaning (called by MOB Cleaner chore)
- * @param conf configuration
- * @param table table name
- * @throws IOException exception
- */
- public void cleanupObsoleteMobFiles(Configuration conf, TableName table)
throws IOException {
-
- long minAgeToArchive =
- conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
- // We check only those MOB files, which creation time is less
- // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
- // gives us full confidence that all corresponding store files will
- // exist at the time cleaning procedure begins and will be examined.
- // So, if MOB file creation time is greater than this maxTimeToArchive,
- // this will be skipped and won't be archived.
- long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() -
minAgeToArchive;
- try (final Connection conn = ConnectionFactory.createConnection(conf);
- final Admin admin = conn.getAdmin();) {
- TableDescriptor htd = admin.getDescriptor(table);
- List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
- if (list.size() == 0) {
- LOG.info("Skipping non-MOB table [{}]", table);
- return;
- } else {
- LOG.info("Only MOB files whose creation time older than {} will be
archived, table={}",
- maxCreationTimeToArchive, table);
- }
-
- FileSystem fs = FileSystem.get(conf);
- Set<String> regionNames = new HashSet<>();
- Path rootDir = CommonFSUtils.getRootDir(conf);
- Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
- List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
-
- Set<String> allActiveMobFileName = new HashSet<String>();
- for (Path regionPath : regionDirs) {
- regionNames.add(regionPath.getName());
- for (ColumnFamilyDescriptor hcd : list) {
- String family = hcd.getNameAsString();
- Path storePath = new Path(regionPath, family);
- boolean succeed = false;
- Set<String> regionMobs = new HashSet<String>();
-
- while (!succeed) {
- if (!fs.exists(storePath)) {
- String errMsg = String.format("Directory %s was deleted during
MOB file cleaner chore"
- + " execution, aborting MOB file cleaner chore.", storePath);
- throw new IOException(errMsg);
- }
- RemoteIterator<LocatedFileStatus> rit =
fs.listLocatedStatus(storePath);
- List<Path> storeFiles = new ArrayList<Path>();
- // Load list of store files first
- while (rit.hasNext()) {
- Path p = rit.next().getPath();
- if (fs.isFile(p)) {
- storeFiles.add(p);
- }
- }
- LOG.info("Found {} store files in: {}", storeFiles.size(),
storePath);
- Path currentPath = null;
- try {
- for (Path pp : storeFiles) {
- currentPath = pp;
- LOG.trace("Store file: {}", pp);
- HStoreFile sf = null;
- byte[] mobRefData = null;
- byte[] bulkloadMarkerData = null;
- try {
- sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED,
BloomType.NONE, true);
- sf.initReader();
- mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
- bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
- // close store file to avoid memory leaks
- sf.closeStoreFile(true);
- } catch (IOException ex) {
- // When FileBased SFT is active the store dir can contain
corrupted or incomplete
- // files. So read errors are expected. We just skip these
files.
- if (ex instanceof FileNotFoundException) {
- throw ex;
- }
- LOG.debug("Failed to get mob data from file: {} due to
error.", pp.toString(),
- ex);
- continue;
- }
- if (mobRefData == null) {
- if (bulkloadMarkerData == null) {
- LOG.warn("Found old store file with no MOB_FILE_REFS: {} -
"
- + "can not proceed until all old files will be
MOB-compacted.", pp);
- return;
- } else {
- LOG.debug("Skipping file without MOB references
(bulkloaded file):{}", pp);
- continue;
- }
- }
- // file may or may not have MOB references, but was created by
the distributed
- // mob compaction code.
- try {
- SetMultimap<TableName, String> mobs =
- MobUtils.deserializeMobFileRefs(mobRefData).build();
- LOG.debug("Found {} mob references for store={}",
mobs.size(), sf);
- LOG.trace("Specific mob references found for store={} : {}",
sf, mobs);
- regionMobs.addAll(mobs.values());
- } catch (RuntimeException exception) {
- throw new IOException("failure getting mob references for
hfile " + sf,
- exception);
- }
- }
- } catch (FileNotFoundException e) {
- LOG.warn(
- "Missing file:{} Starting MOB cleaning cycle from the
beginning" + " due to error",
- currentPath, e);
- regionMobs.clear();
- continue;
- }
- succeed = true;
- }
-
- // Add MOB references for current region/family
- allActiveMobFileName.addAll(regionMobs);
- } // END column families
- } // END regions
- // Check if number of MOB files too big (over 1M)
- if (allActiveMobFileName.size() > 1000000) {
- LOG.warn("Found too many active MOB files: {}, table={}, "
- + "this may result in high memory pressure.",
allActiveMobFileName.size(), table);
- }
- LOG.debug("Found: {} active mob refs for table={}",
allActiveMobFileName.size(), table);
- allActiveMobFileName.stream().forEach(LOG::trace);
-
- // Now scan MOB directories and find MOB files with no references to them
- for (ColumnFamilyDescriptor hcd : list) {
- List<Path> toArchive = new ArrayList<Path>();
- String family = hcd.getNameAsString();
- Path dir = MobUtils.getMobFamilyPath(conf, table, family);
- RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
- while (rit.hasNext()) {
- LocatedFileStatus lfs = rit.next();
- Path p = lfs.getPath();
- String[] mobParts = p.getName().split("_");
- String regionName = mobParts[mobParts.length - 1];
-
- if (!regionNames.contains(regionName)) {
- // MOB belonged to a region no longer hosted
- long creationTime = fs.getFileStatus(p).getModificationTime();
- if (creationTime < maxCreationTimeToArchive) {
- LOG.trace("Archiving MOB file {} creation time={}", p,
- (fs.getFileStatus(p).getModificationTime()));
- toArchive.add(p);
- } else {
- LOG.trace("Skipping fresh file: {}. Creation time={}", p,
- fs.getFileStatus(p).getModificationTime());
- }
- } else {
- LOG.trace("Keeping MOB file with existing region: {}", p);
- }
- }
- LOG.info(" MOB Cleaner found {} files to archive for table={}
family={}", toArchive.size(),
- table, family);
- archiveMobFiles(conf, table, family.getBytes(), toArchive);
- LOG.info(" MOB Cleaner archived {} files, table={} family={}",
toArchive.size(), table,
- family);
- }
- }
- }
-
- /**
- * Archives the mob files.
- * @param conf The current configuration.
- * @param tableName The table name.
- * @param family The name of the column family.
- * @param storeFiles The files to be archived.
- * @throws IOException exception
- */
- public void archiveMobFiles(Configuration conf, TableName tableName, byte[]
family,
- List<Path> storeFiles) throws IOException {
-
- if (storeFiles.size() == 0) {
- // nothing to remove
- LOG.debug("Skipping archiving old MOB files - no files found for
table={} cf={}", tableName,
- Bytes.toString(family));
- return;
- }
- Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf),
tableName);
- FileSystem fs = storeFiles.get(0).getFileSystem(conf);
-
- for (Path p : storeFiles) {
- LOG.debug("MOB Cleaner is archiving: {}", p);
- HFileArchiver.archiveStoreFile(conf, fs,
MobUtils.getMobRegionInfo(tableName), mobTableDir,
- family, p);
- }
- }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanupUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanupUtil.java
new file mode 100644
index 00000000000..049192624ef
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanupUtil.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
[email protected]
+public final class MobFileCleanupUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MobFileCleanupUtil.class);
+
+ private MobFileCleanupUtil() {
+ }
+
+ /**
+ * Performs housekeeping file cleaning (called by MOB Cleaner chore)
+ * @param conf configuration
+ * @param table table name
+ * @throws IOException exception
+ */
+ public static void cleanupObsoleteMobFiles(Configuration conf, TableName
table, Admin admin)
+ throws IOException {
+ long minAgeToArchive =
+ conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
+ // We check only those MOB files, which creation time is less
+ // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
+ // gives us full confidence that all corresponding store files will
+ // exist at the time cleaning procedure begins and will be examined.
+ // So, if MOB file creation time is greater than this maxTimeToArchive,
+ // this will be skipped and won't be archived.
+ long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() -
minAgeToArchive;
+ TableDescriptor htd = admin.getDescriptor(table);
+ List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+ if (list.size() == 0) {
+ LOG.info("Skipping non-MOB table [{}]", table);
+ return;
+ } else {
+ LOG.info("Only MOB files whose creation time older than {} will be
archived, table={}",
+ maxCreationTimeToArchive, table);
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+ Set<String> regionNames = new HashSet<>();
+ Path rootDir = CommonFSUtils.getRootDir(conf);
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
+ List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
+
+ Set<String> allActiveMobFileName = new HashSet<String>();
+ for (Path regionPath : regionDirs) {
+ regionNames.add(regionPath.getName());
+ for (ColumnFamilyDescriptor hcd : list) {
+ String family = hcd.getNameAsString();
+ Path storePath = new Path(regionPath, family);
+ boolean succeed = false;
+ Set<String> regionMobs = new HashSet<String>();
+
+ while (!succeed) {
+ if (!fs.exists(storePath)) {
+ String errMsg = String.format("Directory %s was deleted during MOB
file cleaner chore"
+ + " execution, aborting MOB file cleaner chore.", storePath);
+ throw new IOException(errMsg);
+ }
+ RemoteIterator<LocatedFileStatus> rit =
fs.listLocatedStatus(storePath);
+ List<Path> storeFiles = new ArrayList<Path>();
+ // Load list of store files first
+ while (rit.hasNext()) {
+ Path p = rit.next().getPath();
+ if (fs.isFile(p)) {
+ storeFiles.add(p);
+ }
+ }
+ LOG.info("Found {} store files in: {}", storeFiles.size(),
storePath);
+ Path currentPath = null;
+ try {
+ for (Path pp : storeFiles) {
+ currentPath = pp;
+ LOG.trace("Store file: {}", pp);
+ HStoreFile sf = null;
+ byte[] mobRefData = null;
+ byte[] bulkloadMarkerData = null;
+ try {
+ sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED,
BloomType.NONE, true);
+ sf.initReader();
+ mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+ bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+ // close store file to avoid memory leaks
+ sf.closeStoreFile(true);
+ } catch (IOException ex) {
+ // When FileBased SFT is active the store dir can contain
corrupted or incomplete
+ // files. So read errors are expected. We just skip these
files.
+ if (ex instanceof FileNotFoundException) {
+ throw ex;
+ }
+ LOG.debug("Failed to get mob data from file: {} due to
error.", pp.toString(), ex);
+ continue;
+ }
+ if (mobRefData == null) {
+ if (bulkloadMarkerData == null) {
+ LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
+ + "can not proceed until all old files will be
MOB-compacted.", pp);
+ return;
+ } else {
+ LOG.debug("Skipping file without MOB references (bulkloaded
file):{}", pp);
+ continue;
+ }
+ }
+ // file may or may not have MOB references, but was created by
the distributed
+ // mob compaction code.
+ try {
+ SetMultimap<TableName, String> mobs =
+ MobUtils.deserializeMobFileRefs(mobRefData).build();
+ LOG.debug("Found {} mob references for store={}", mobs.size(),
sf);
+ LOG.trace("Specific mob references found for store={} : {}",
sf, mobs);
+ regionMobs.addAll(mobs.values());
+ } catch (RuntimeException exception) {
+ throw new IOException("failure getting mob references for
hfile " + sf, exception);
+ }
+ }
+ } catch (FileNotFoundException e) {
+ LOG.warn(
+ "Missing file:{} Starting MOB cleaning cycle from the beginning"
+ " due to error",
+ currentPath, e);
+ regionMobs.clear();
+ continue;
+ }
+ succeed = true;
+ }
+
+ // Add MOB references for current region/family
+ allActiveMobFileName.addAll(regionMobs);
+ } // END column families
+ } // END regions
+ // Check if number of MOB files too big (over 1M)
+ if (allActiveMobFileName.size() > 1000000) {
+ LOG.warn("Found too many active MOB files: {}, table={}, "
+ + "this may result in high memory pressure.",
allActiveMobFileName.size(), table);
+ }
+ LOG.debug("Found: {} active mob refs for table={}",
allActiveMobFileName.size(), table);
+ allActiveMobFileName.stream().forEach(LOG::trace);
+
+ // Now scan MOB directories and find MOB files with no references to them
+ for (ColumnFamilyDescriptor hcd : list) {
+ checkColumnFamilyDescriptor(conf, table, fs, admin, hcd, regionNames,
+ maxCreationTimeToArchive);
+ }
+ }
+
+ private static void checkColumnFamilyDescriptor(Configuration conf,
TableName table,
+ FileSystem fs, Admin admin, ColumnFamilyDescriptor hcd, Set<String>
regionNames,
+ long maxCreationTimeToArchive) throws IOException {
+ List<Path> toArchive = new ArrayList<Path>();
+ String family = hcd.getNameAsString();
+ Path dir = MobUtils.getMobFamilyPath(conf, table, family);
+ RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
+ while (rit.hasNext()) {
+ LocatedFileStatus lfs = rit.next();
+ Path p = lfs.getPath();
+ String[] mobParts = p.getName().split("_");
+ String regionName = mobParts[mobParts.length - 1];
+
+ if (!regionNames.contains(regionName)) {
+ // MOB belonged to a region no longer hosted
+ long creationTime = fs.getFileStatus(p).getModificationTime();
+ if (creationTime < maxCreationTimeToArchive) {
+ LOG.trace("Archiving MOB file {} creation time={}", p,
+ (fs.getFileStatus(p).getModificationTime()));
+ toArchive.add(p);
+ } else {
+ LOG.trace("Skipping fresh file: {}. Creation time={}", p,
+ fs.getFileStatus(p).getModificationTime());
+ }
+ } else {
+ LOG.trace("Keeping MOB file with existing region: {}", p);
+ }
+ }
+ LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
toArchive.size(),
+ table, family);
+ archiveMobFiles(conf, table, admin, family.getBytes(), toArchive);
+ LOG.info(" MOB Cleaner archived {} files, table={} family={}",
toArchive.size(), table, family);
+ }
+
+ /**
+ * Archives the mob files.
+ * @param conf The current configuration.
+ * @param tableName The table name.
+ * @param family The name of the column family.
+ * @param storeFiles The files to be archived.
+ * @throws IOException exception
+ */
+ private static void archiveMobFiles(Configuration conf, TableName tableName,
Admin admin,
+ byte[] family, List<Path> storeFiles) throws IOException {
+
+ if (storeFiles.size() == 0) {
+ // nothing to remove
+ LOG.debug("Skipping archiving old MOB files - no files found for
table={} cf={}", tableName,
+ Bytes.toString(family));
+ return;
+ }
+ Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf),
tableName);
+ FileSystem fs = storeFiles.get(0).getFileSystem(conf);
+
+ for (Path p : storeFiles) {
+ LOG.debug("MOB Cleaner is archiving: {}", p);
+ HFileArchiver.archiveStoreFile(conf, fs,
MobUtils.getMobRegionInfo(tableName), mobTableDir,
+ family, p);
+ }
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
index 8dd545f0015..705cd90284f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
@@ -69,7 +69,6 @@ public class MobStressToolRunner {
private long count = 500000;
private double failureProb = 0.1;
private Table table = null;
- private MobFileCleanerChore chore = new MobFileCleanerChore();
private static volatile boolean run = true;
@@ -157,9 +156,9 @@ public class MobStressToolRunner {
public void run() {
while (run) {
try {
- LOG.info("MOB cleanup chore started ...");
- chore.cleanupObsoleteMobFiles(conf, table.getName());
- LOG.info("MOB cleanup chore finished");
+ LOG.info("MOB cleanup started ...");
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(),
admin);
+ LOG.info("MOB cleanup finished");
Thread.sleep(130000);
} catch (Exception e) {
@@ -228,7 +227,7 @@ public class MobStressToolRunner {
LOG.info("Waiting for write thread to finish ...");
writeData.join();
// Cleanup again
- chore.cleanupObsoleteMobFiles(conf, table.getName());
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
getNumberOfMobFiles(conf, table.getName(), new String(fam));
if (HTU != null) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
index df3eb29525b..b7283aa5919 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -99,8 +99,6 @@ public class TestMobCompactionWithDefaults {
protected int numRegions = 20;
protected int rows = 1000;
- protected MobFileCleanerChore cleanerChore;
-
protected Boolean useFileBasedSFT;
public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
@@ -142,7 +140,6 @@ public class TestMobCompactionWithDefaults {
htuStart();
tableDescriptor =
HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
admin = HTU.getAdmin();
- cleanerChore = new MobFileCleanerChore();
familyDescriptor = new
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
familyDescriptor.setMobEnabled(true);
familyDescriptor.setMobThreshold(mobLen);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java
similarity index 95%
rename from
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
rename to
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java
index 6ce127b994a..6c9b869a9dd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java
@@ -59,11 +59,11 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings("deprecation")
@Category(MediumTests.class)
-public class TestMobFileCleanerChore {
- private static final Logger LOG =
LoggerFactory.getLogger(TestMobFileCleanerChore.class);
+public class TestMobFileCleanupUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestMobFileCleanupUtil.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMobFileCleanerChore.class);
+ HBaseClassTestRule.forClass(TestMobFileCleanupUtil.class);
private HBaseTestingUtility HTU;
@@ -79,10 +79,9 @@ public class TestMobFileCleanerChore {
private HColumnDescriptor hcd;
private Admin admin;
private Table table = null;
- private MobFileCleanerChore chore;
private long minAgeToArchive = 10000;
- public TestMobFileCleanerChore() {
+ public TestMobFileCleanupUtil() {
}
@Before
@@ -95,7 +94,6 @@ public class TestMobFileCleanerChore {
HTU.startMiniCluster();
admin = HTU.getAdmin();
- chore = new MobFileCleanerChore();
hcd = new HColumnDescriptor(fam);
hcd.setMobEnabled(true);
hcd.setMobThreshold(mobLen);
@@ -172,7 +170,7 @@ public class TestMobFileCleanerChore {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
// Cleanup
- chore.cleanupObsoleteMobFiles(conf, table.getName());
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
// verify that nothing have happened
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
@@ -193,7 +191,7 @@ public class TestMobFileCleanerChore {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
- chore.cleanupObsoleteMobFiles(conf, table.getName());
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
// check that the extra file got deleted
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));