This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 9e40fa13a7a HBASE-24687: Use existing HMaster Connection in 
MobFileCleanerChore (#5509)
9e40fa13a7a is described below

commit 9e40fa13a7a5b2c89cb7054783b79b48416ca85a
Author: Charles Connell <[email protected]>
AuthorDate: Mon Nov 13 09:35:12 2023 -0500

    HBASE-24687: Use existing HMaster Connection in MobFileCleanerChore (#5509)
    
    Signed-off-by: Bryan Beaudreault <[email protected]>
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hadoop/hbase/IntegrationTestMobCompaction.java |  14 +-
 .../hadoop/hbase/mob/MobFileCleanerChore.java      | 226 +------------------
 .../hadoop/hbase/mob/MobFileCleanupUtil.java       | 250 +++++++++++++++++++++
 .../hadoop/hbase/mob/MobStressToolRunner.java      |   9 +-
 .../hbase/mob/TestMobCompactionWithDefaults.java   |   3 -
 ...eanerChore.java => TestMobFileCleanupUtil.java} |  14 +-
 6 files changed, 269 insertions(+), 247 deletions(-)

diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
index a39d71888a3..208f11807b0 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMobCompaction.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor;
 import org.apache.hadoop.hbase.mob.MobConstants;
-import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
+import org.apache.hadoop.hbase.mob.MobFileCleanupUtil;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
@@ -97,7 +97,6 @@ public class IntegrationTestMobCompaction extends 
IntegrationTestBase {
   private static HColumnDescriptor hcd;
   private static Admin admin;
   private static Table table = null;
-  private static MobFileCleanerChore chore;
 
   private static volatile boolean run = true;
 
@@ -248,12 +247,9 @@ public class IntegrationTestMobCompaction extends 
IntegrationTestBase {
     public void run() {
       while (run) {
         try {
-          LOG.info("MOB cleanup chore started ...");
-          if (chore == null) {
-            chore = new MobFileCleanerChore();
-          }
-          chore.cleanupObsoleteMobFiles(conf, table.getName());
-          LOG.info("MOB cleanup chore finished");
+          LOG.info("MOB cleanup started ...");
+          MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), 
admin);
+          LOG.info("MOB cleanup finished");
 
           Thread.sleep(130000);
         } catch (Exception e) {
@@ -328,7 +324,7 @@ public class IntegrationTestMobCompaction extends 
IntegrationTestBase {
       LOG.info("Waiting for write thread to finish ...");
       writeData.join();
       // Cleanup again
-      chore.cleanupObsoleteMobFiles(conf, table.getName());
+      MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
 
       if (util != null) {
         LOG.info("Archive cleaner started ...");
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index eb2201417b0..c4bada278df 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -17,43 +17,20 @@
  */
 package org.apache.hadoop.hbase.mob;
 
-import com.google.errorprone.annotations.RestrictedApi;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStoreFile;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
-
 /**
  * The class MobFileCleanerChore for running cleaner regularly to remove the 
expired and obsolete
  * (files which have no active references to) mob files.
@@ -100,12 +77,6 @@ public class MobFileCleanerChore extends ScheduledChore {
     }
   }
 
-  @RestrictedApi(explanation = "Should only be called in tests", link = "",
-      allowedOnPath = ".*/src/test/.*")
-  public MobFileCleanerChore() {
-    this.master = null;
-  }
-
   @Override
   protected void chore() {
     TableDescriptors htds = master.getTableDescriptors();
@@ -131,7 +102,10 @@ public class MobFileCleanerChore extends ScheduledChore {
       try {
         // Now clean obsolete files for a table
         LOG.info("Cleaning obsolete MOB files from table={}", 
htd.getTableName());
-        cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName());
+        try (final Admin admin = master.getConnection().getAdmin()) {
+          
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), 
htd.getTableName(),
+            admin);
+        }
         LOG.info("Cleaning obsolete MOB files finished for table={}", 
htd.getTableName());
       } catch (IOException e) {
         LOG.error("Failed to clean the obsolete mob files for table={}", 
htd.getTableName(), e);
@@ -139,196 +113,4 @@ public class MobFileCleanerChore extends ScheduledChore {
     }
   }
 
-  /**
-   * Performs housekeeping file cleaning (called by MOB Cleaner chore)
-   * @param conf  configuration
-   * @param table table name
-   * @throws IOException exception
-   */
-  public void cleanupObsoleteMobFiles(Configuration conf, TableName table) 
throws IOException {
-
-    long minAgeToArchive =
-      conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
-    // We check only those MOB files, which creation time is less
-    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
-    // gives us full confidence that all corresponding store files will
-    // exist at the time cleaning procedure begins and will be examined.
-    // So, if MOB file creation time is greater than this maxTimeToArchive,
-    // this will be skipped and won't be archived.
-    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - 
minAgeToArchive;
-    try (final Connection conn = ConnectionFactory.createConnection(conf);
-      final Admin admin = conn.getAdmin();) {
-      TableDescriptor htd = admin.getDescriptor(table);
-      List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
-      if (list.size() == 0) {
-        LOG.info("Skipping non-MOB table [{}]", table);
-        return;
-      } else {
-        LOG.info("Only MOB files whose creation time older than {} will be 
archived, table={}",
-          maxCreationTimeToArchive, table);
-      }
-
-      FileSystem fs = FileSystem.get(conf);
-      Set<String> regionNames = new HashSet<>();
-      Path rootDir = CommonFSUtils.getRootDir(conf);
-      Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
-      List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
-
-      Set<String> allActiveMobFileName = new HashSet<String>();
-      for (Path regionPath : regionDirs) {
-        regionNames.add(regionPath.getName());
-        for (ColumnFamilyDescriptor hcd : list) {
-          String family = hcd.getNameAsString();
-          Path storePath = new Path(regionPath, family);
-          boolean succeed = false;
-          Set<String> regionMobs = new HashSet<String>();
-
-          while (!succeed) {
-            if (!fs.exists(storePath)) {
-              String errMsg = String.format("Directory %s was deleted during 
MOB file cleaner chore"
-                + " execution, aborting MOB file cleaner chore.", storePath);
-              throw new IOException(errMsg);
-            }
-            RemoteIterator<LocatedFileStatus> rit = 
fs.listLocatedStatus(storePath);
-            List<Path> storeFiles = new ArrayList<Path>();
-            // Load list of store files first
-            while (rit.hasNext()) {
-              Path p = rit.next().getPath();
-              if (fs.isFile(p)) {
-                storeFiles.add(p);
-              }
-            }
-            LOG.info("Found {} store files in: {}", storeFiles.size(), 
storePath);
-            Path currentPath = null;
-            try {
-              for (Path pp : storeFiles) {
-                currentPath = pp;
-                LOG.trace("Store file: {}", pp);
-                HStoreFile sf = null;
-                byte[] mobRefData = null;
-                byte[] bulkloadMarkerData = null;
-                try {
-                  sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, 
BloomType.NONE, true);
-                  sf.initReader();
-                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
-                  bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
-                  // close store file to avoid memory leaks
-                  sf.closeStoreFile(true);
-                } catch (IOException ex) {
-                  // When FileBased SFT is active the store dir can contain 
corrupted or incomplete
-                  // files. So read errors are expected. We just skip these 
files.
-                  if (ex instanceof FileNotFoundException) {
-                    throw ex;
-                  }
-                  LOG.debug("Failed to get mob data from file: {} due to 
error.", pp.toString(),
-                    ex);
-                  continue;
-                }
-                if (mobRefData == null) {
-                  if (bulkloadMarkerData == null) {
-                    LOG.warn("Found old store file with no MOB_FILE_REFS: {} - 
"
-                      + "can not proceed until all old files will be 
MOB-compacted.", pp);
-                    return;
-                  } else {
-                    LOG.debug("Skipping file without MOB references 
(bulkloaded file):{}", pp);
-                    continue;
-                  }
-                }
-                // file may or may not have MOB references, but was created by 
the distributed
-                // mob compaction code.
-                try {
-                  SetMultimap<TableName, String> mobs =
-                    MobUtils.deserializeMobFileRefs(mobRefData).build();
-                  LOG.debug("Found {} mob references for store={}", 
mobs.size(), sf);
-                  LOG.trace("Specific mob references found for store={} : {}", 
sf, mobs);
-                  regionMobs.addAll(mobs.values());
-                } catch (RuntimeException exception) {
-                  throw new IOException("failure getting mob references for 
hfile " + sf,
-                    exception);
-                }
-              }
-            } catch (FileNotFoundException e) {
-              LOG.warn(
-                "Missing file:{} Starting MOB cleaning cycle from the 
beginning" + " due to error",
-                currentPath, e);
-              regionMobs.clear();
-              continue;
-            }
-            succeed = true;
-          }
-
-          // Add MOB references for current region/family
-          allActiveMobFileName.addAll(regionMobs);
-        } // END column families
-      } // END regions
-      // Check if number of MOB files too big (over 1M)
-      if (allActiveMobFileName.size() > 1000000) {
-        LOG.warn("Found too many active MOB files: {}, table={}, "
-          + "this may result in high memory pressure.", 
allActiveMobFileName.size(), table);
-      }
-      LOG.debug("Found: {} active mob refs for table={}", 
allActiveMobFileName.size(), table);
-      allActiveMobFileName.stream().forEach(LOG::trace);
-
-      // Now scan MOB directories and find MOB files with no references to them
-      for (ColumnFamilyDescriptor hcd : list) {
-        List<Path> toArchive = new ArrayList<Path>();
-        String family = hcd.getNameAsString();
-        Path dir = MobUtils.getMobFamilyPath(conf, table, family);
-        RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
-        while (rit.hasNext()) {
-          LocatedFileStatus lfs = rit.next();
-          Path p = lfs.getPath();
-          String[] mobParts = p.getName().split("_");
-          String regionName = mobParts[mobParts.length - 1];
-
-          if (!regionNames.contains(regionName)) {
-            // MOB belonged to a region no longer hosted
-            long creationTime = fs.getFileStatus(p).getModificationTime();
-            if (creationTime < maxCreationTimeToArchive) {
-              LOG.trace("Archiving MOB file {} creation time={}", p,
-                (fs.getFileStatus(p).getModificationTime()));
-              toArchive.add(p);
-            } else {
-              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
-                fs.getFileStatus(p).getModificationTime());
-            }
-          } else {
-            LOG.trace("Keeping MOB file with existing region: {}", p);
-          }
-        }
-        LOG.info(" MOB Cleaner found {} files to archive for table={} 
family={}", toArchive.size(),
-          table, family);
-        archiveMobFiles(conf, table, family.getBytes(), toArchive);
-        LOG.info(" MOB Cleaner archived {} files, table={} family={}", 
toArchive.size(), table,
-          family);
-      }
-    }
-  }
-
-  /**
-   * Archives the mob files.
-   * @param conf       The current configuration.
-   * @param tableName  The table name.
-   * @param family     The name of the column family.
-   * @param storeFiles The files to be archived.
-   * @throws IOException exception
-   */
-  public void archiveMobFiles(Configuration conf, TableName tableName, byte[] 
family,
-    List<Path> storeFiles) throws IOException {
-
-    if (storeFiles.size() == 0) {
-      // nothing to remove
-      LOG.debug("Skipping archiving old MOB files - no files found for 
table={} cf={}", tableName,
-        Bytes.toString(family));
-      return;
-    }
-    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), 
tableName);
-    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
-
-    for (Path p : storeFiles) {
-      LOG.debug("MOB Cleaner is archiving: {}", p);
-      HFileArchiver.archiveStoreFile(conf, fs, 
MobUtils.getMobRegionInfo(tableName), mobTableDir,
-        family, p);
-    }
-  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanupUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanupUtil.java
new file mode 100644
index 00000000000..049192624ef
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanupUtil.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
[email protected]
+public final class MobFileCleanupUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MobFileCleanupUtil.class);
+
+  private MobFileCleanupUtil() {
+  }
+
+  /**
+   * Performs housekeeping file cleaning (called by MOB Cleaner chore)
+   * @param conf  configuration
+   * @param table table name
+   * @throws IOException exception
+   */
+  public static void cleanupObsoleteMobFiles(Configuration conf, TableName 
table, Admin admin)
+    throws IOException {
+    long minAgeToArchive =
+      conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 
MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
+    // We check only those MOB files, which creation time is less
+    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
+    // gives us full confidence that all corresponding store files will
+    // exist at the time cleaning procedure begins and will be examined.
+    // So, if MOB file creation time is greater than this maxTimeToArchive,
+    // this will be skipped and won't be archived.
+    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - 
minAgeToArchive;
+    TableDescriptor htd = admin.getDescriptor(table);
+    List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+    if (list.size() == 0) {
+      LOG.info("Skipping non-MOB table [{}]", table);
+      return;
+    } else {
+      LOG.info("Only MOB files whose creation time older than {} will be 
archived, table={}",
+        maxCreationTimeToArchive, table);
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    Set<String> regionNames = new HashSet<>();
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
+    List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
+
+    Set<String> allActiveMobFileName = new HashSet<String>();
+    for (Path regionPath : regionDirs) {
+      regionNames.add(regionPath.getName());
+      for (ColumnFamilyDescriptor hcd : list) {
+        String family = hcd.getNameAsString();
+        Path storePath = new Path(regionPath, family);
+        boolean succeed = false;
+        Set<String> regionMobs = new HashSet<String>();
+
+        while (!succeed) {
+          if (!fs.exists(storePath)) {
+            String errMsg = String.format("Directory %s was deleted during MOB 
file cleaner chore"
+              + " execution, aborting MOB file cleaner chore.", storePath);
+            throw new IOException(errMsg);
+          }
+          RemoteIterator<LocatedFileStatus> rit = 
fs.listLocatedStatus(storePath);
+          List<Path> storeFiles = new ArrayList<Path>();
+          // Load list of store files first
+          while (rit.hasNext()) {
+            Path p = rit.next().getPath();
+            if (fs.isFile(p)) {
+              storeFiles.add(p);
+            }
+          }
+          LOG.info("Found {} store files in: {}", storeFiles.size(), 
storePath);
+          Path currentPath = null;
+          try {
+            for (Path pp : storeFiles) {
+              currentPath = pp;
+              LOG.trace("Store file: {}", pp);
+              HStoreFile sf = null;
+              byte[] mobRefData = null;
+              byte[] bulkloadMarkerData = null;
+              try {
+                sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, 
BloomType.NONE, true);
+                sf.initReader();
+                mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+                bulkloadMarkerData = 
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+                // close store file to avoid memory leaks
+                sf.closeStoreFile(true);
+              } catch (IOException ex) {
+                // When FileBased SFT is active the store dir can contain 
corrupted or incomplete
+                // files. So read errors are expected. We just skip these 
files.
+                if (ex instanceof FileNotFoundException) {
+                  throw ex;
+                }
+                LOG.debug("Failed to get mob data from file: {} due to 
error.", pp.toString(), ex);
+                continue;
+              }
+              if (mobRefData == null) {
+                if (bulkloadMarkerData == null) {
+                  LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
+                    + "can not proceed until all old files will be 
MOB-compacted.", pp);
+                  return;
+                } else {
+                  LOG.debug("Skipping file without MOB references (bulkloaded 
file):{}", pp);
+                  continue;
+                }
+              }
+              // file may or may not have MOB references, but was created by 
the distributed
+              // mob compaction code.
+              try {
+                SetMultimap<TableName, String> mobs =
+                  MobUtils.deserializeMobFileRefs(mobRefData).build();
+                LOG.debug("Found {} mob references for store={}", mobs.size(), 
sf);
+                LOG.trace("Specific mob references found for store={} : {}", 
sf, mobs);
+                regionMobs.addAll(mobs.values());
+              } catch (RuntimeException exception) {
+                throw new IOException("failure getting mob references for 
hfile " + sf, exception);
+              }
+            }
+          } catch (FileNotFoundException e) {
+            LOG.warn(
+              "Missing file:{} Starting MOB cleaning cycle from the beginning" 
+ " due to error",
+              currentPath, e);
+            regionMobs.clear();
+            continue;
+          }
+          succeed = true;
+        }
+
+        // Add MOB references for current region/family
+        allActiveMobFileName.addAll(regionMobs);
+      } // END column families
+    } // END regions
+    // Check if number of MOB files too big (over 1M)
+    if (allActiveMobFileName.size() > 1000000) {
+      LOG.warn("Found too many active MOB files: {}, table={}, "
+        + "this may result in high memory pressure.", 
allActiveMobFileName.size(), table);
+    }
+    LOG.debug("Found: {} active mob refs for table={}", 
allActiveMobFileName.size(), table);
+    allActiveMobFileName.stream().forEach(LOG::trace);
+
+    // Now scan MOB directories and find MOB files with no references to them
+    for (ColumnFamilyDescriptor hcd : list) {
+      checkColumnFamilyDescriptor(conf, table, fs, admin, hcd, regionNames,
+        maxCreationTimeToArchive);
+    }
+  }
+
+  private static void checkColumnFamilyDescriptor(Configuration conf, 
TableName table,
+    FileSystem fs, Admin admin, ColumnFamilyDescriptor hcd, Set<String> 
regionNames,
+    long maxCreationTimeToArchive) throws IOException {
+    List<Path> toArchive = new ArrayList<Path>();
+    String family = hcd.getNameAsString();
+    Path dir = MobUtils.getMobFamilyPath(conf, table, family);
+    RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
+    while (rit.hasNext()) {
+      LocatedFileStatus lfs = rit.next();
+      Path p = lfs.getPath();
+      String[] mobParts = p.getName().split("_");
+      String regionName = mobParts[mobParts.length - 1];
+
+      if (!regionNames.contains(regionName)) {
+        // MOB belonged to a region no longer hosted
+        long creationTime = fs.getFileStatus(p).getModificationTime();
+        if (creationTime < maxCreationTimeToArchive) {
+          LOG.trace("Archiving MOB file {} creation time={}", p,
+            (fs.getFileStatus(p).getModificationTime()));
+          toArchive.add(p);
+        } else {
+          LOG.trace("Skipping fresh file: {}. Creation time={}", p,
+            fs.getFileStatus(p).getModificationTime());
+        }
+      } else {
+        LOG.trace("Keeping MOB file with existing region: {}", p);
+      }
+    }
+    LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", 
toArchive.size(),
+      table, family);
+    archiveMobFiles(conf, table, admin, family.getBytes(), toArchive);
+    LOG.info(" MOB Cleaner archived {} files, table={} family={}", 
toArchive.size(), table, family);
+  }
+
+  /**
+   * Archives the mob files.
+   * @param conf       The current configuration.
+   * @param tableName  The table name.
+   * @param family     The name of the column family.
+   * @param storeFiles The files to be archived.
+   * @throws IOException exception
+   */
+  private static void archiveMobFiles(Configuration conf, TableName tableName, 
Admin admin,
+    byte[] family, List<Path> storeFiles) throws IOException {
+
+    if (storeFiles.size() == 0) {
+      // nothing to remove
+      LOG.debug("Skipping archiving old MOB files - no files found for 
table={} cf={}", tableName,
+        Bytes.toString(family));
+      return;
+    }
+    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), 
tableName);
+    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
+
+    for (Path p : storeFiles) {
+      LOG.debug("MOB Cleaner is archiving: {}", p);
+      HFileArchiver.archiveStoreFile(conf, fs, 
MobUtils.getMobRegionInfo(tableName), mobTableDir,
+        family, p);
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
index 8dd545f0015..705cd90284f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobStressToolRunner.java
@@ -69,7 +69,6 @@ public class MobStressToolRunner {
   private long count = 500000;
   private double failureProb = 0.1;
   private Table table = null;
-  private MobFileCleanerChore chore = new MobFileCleanerChore();
 
   private static volatile boolean run = true;
 
@@ -157,9 +156,9 @@ public class MobStressToolRunner {
     public void run() {
       while (run) {
         try {
-          LOG.info("MOB cleanup chore started ...");
-          chore.cleanupObsoleteMobFiles(conf, table.getName());
-          LOG.info("MOB cleanup chore finished");
+          LOG.info("MOB cleanup started ...");
+          MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), 
admin);
+          LOG.info("MOB cleanup finished");
 
           Thread.sleep(130000);
         } catch (Exception e) {
@@ -228,7 +227,7 @@ public class MobStressToolRunner {
       LOG.info("Waiting for write thread to finish ...");
       writeData.join();
       // Cleanup again
-      chore.cleanupObsoleteMobFiles(conf, table.getName());
+      MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
       getNumberOfMobFiles(conf, table.getName(), new String(fam));
 
       if (HTU != null) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
index df3eb29525b..b7283aa5919 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -99,8 +99,6 @@ public class TestMobCompactionWithDefaults {
   protected int numRegions = 20;
   protected int rows = 1000;
 
-  protected MobFileCleanerChore cleanerChore;
-
   protected Boolean useFileBasedSFT;
 
   public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
@@ -142,7 +140,6 @@ public class TestMobCompactionWithDefaults {
     htuStart();
     tableDescriptor = 
HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
     admin = HTU.getAdmin();
-    cleanerChore = new MobFileCleanerChore();
     familyDescriptor = new 
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
     familyDescriptor.setMobEnabled(true);
     familyDescriptor.setMobThreshold(mobLen);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java
similarity index 95%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java
index 6ce127b994a..6c9b869a9dd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanupUtil.java
@@ -59,11 +59,11 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings("deprecation")
 @Category(MediumTests.class)
-public class TestMobFileCleanerChore {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobFileCleanerChore.class);
+public class TestMobFileCleanupUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestMobFileCleanupUtil.class);
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMobFileCleanerChore.class);
+    HBaseClassTestRule.forClass(TestMobFileCleanupUtil.class);
 
   private HBaseTestingUtility HTU;
 
@@ -79,10 +79,9 @@ public class TestMobFileCleanerChore {
   private HColumnDescriptor hcd;
   private Admin admin;
   private Table table = null;
-  private MobFileCleanerChore chore;
   private long minAgeToArchive = 10000;
 
-  public TestMobFileCleanerChore() {
+  public TestMobFileCleanupUtil() {
   }
 
   @Before
@@ -95,7 +94,6 @@ public class TestMobFileCleanerChore {
 
     HTU.startMiniCluster();
     admin = HTU.getAdmin();
-    chore = new MobFileCleanerChore();
     hcd = new HColumnDescriptor(fam);
     hcd.setMobEnabled(true);
     hcd.setMobThreshold(mobLen);
@@ -172,7 +170,7 @@ public class TestMobFileCleanerChore {
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
     // Cleanup
-    chore.cleanupObsoleteMobFiles(conf, table.getName());
+    MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
 
     // verify that nothing have happened
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
@@ -193,7 +191,7 @@ public class TestMobFileCleanerChore {
 
     Thread.sleep(minAgeToArchive + 1000);
     LOG.info("Cleaning up MOB files");
-    chore.cleanupObsoleteMobFiles(conf, table.getName());
+    MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
 
     // check that the extra file got deleted
     num = getNumberOfMobFiles(conf, table.getName(), new String(fam));

Reply via email to