This is an automated email from the ASF dual-hosted git repository.

chandra pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 163a1eb9a43 HBASE-27126 Support multi-threads cleaner for MOB files 
(#7430)(#5833)
163a1eb9a43 is described below

commit 163a1eb9a43fb8a0511b79af0bf50a7c41813211
Author: Chandra Sekhar K <[email protected]>
AuthorDate: Sat Dec 13 11:39:54 2025 +0530

    HBASE-27126 Support multi-threads cleaner for MOB files (#7430)(#5833)
    
    Signed-off-by: Peng Lu <[email protected]>
    (cherry picked from commit f800a13a3575be3d70537f40c3c21171ce4b2603)
---
 .../org/apache/hadoop/hbase/master/HMaster.java    |   6 +
 .../org/apache/hadoop/hbase/mob/MobConstants.java  |   6 +
 .../hadoop/hbase/mob/MobFileCleanerChore.java      | 137 +++++++++--
 .../hbase/mob/TestExpiredMobFileCleanerChore.java  | 255 +++++++++++++++++++++
 4 files changed, 384 insertions(+), 20 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9e6e929eb10..9a2bac9870c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -4430,4 +4430,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
         }
       });
   }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public MobFileCleanerChore getMobFileCleanerChore() {
+    return mobFileCleanerChore;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 51f287ce66e..052058a42ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -149,6 +149,12 @@ public final class MobConstants {
   public static final String MOB_COMPACTION_THREADS_MAX = 
"hbase.mob.compaction.threads.max";
   public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;
 
+  public static final String MOB_CLEANER_THREAD_COUNT = 
"hbase.master.mob.cleaner.threads";
+  public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1;
+  public static final String MOB_FILE_CLEANER_CHORE_TIME_OUT =
+    "hbase.master.mob.cleaner.chore.timeout";
+  public static final int DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT = 5 * 60; // 
5 minutes
+
   private MobConstants() {
 
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index 9ce20e7c650..595078230b3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -17,30 +17,49 @@
  */
 package org.apache.hadoop.hbase.mob;
 
+import static 
org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT;
+import static 
org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT;
+
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * The class MobFileCleanerChore for running cleaner regularly to remove the 
expired and obsolete
  * (files which have no active references to) mob files.
  */
 @InterfaceAudience.Private
-public class MobFileCleanerChore extends ScheduledChore {
+public class MobFileCleanerChore extends ScheduledChore implements 
ConfigurationObserver {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MobFileCleanerChore.class);
+
   private final HMaster master;
-  private ExpiredMobFileCleaner cleaner;
+  private final ExpiredMobFileCleaner cleaner;
+  private final ThreadPoolExecutor executor;
+  private final int cleanerFutureTimeout;
+  private int threadCount;
 
   static {
     Configuration.addDeprecation(MobConstants.DEPRECATED_MOB_CLEANER_PERIOD,
@@ -57,7 +76,21 @@ public class MobFileCleanerChore extends ScheduledChore {
     this.master = master;
     cleaner = new ExpiredMobFileCleaner();
     cleaner.setConf(master.getConfiguration());
+    threadCount = 
master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
+      MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
+    if (threadCount < 1) {
+      threadCount = 1;
+    }
+
+    ThreadFactory threadFactory =
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build();
+
+    executor = new ThreadPoolExecutor(threadCount, threadCount, 60, 
TimeUnit.SECONDS,
+      new LinkedBlockingQueue<Runnable>(), threadFactory);
+
     checkObsoleteConfigurations();
+    cleanerFutureTimeout = 
master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT,
+      DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT);
   }
 
   private void checkObsoleteConfigurations() {
@@ -88,29 +121,93 @@ public class MobFileCleanerChore extends ScheduledChore {
       LOG.error("MobFileCleanerChore failed", e);
       return;
     }
+    List<Future<?>> futureList = new ArrayList<>(map.size());
     for (TableDescriptor htd : map.values()) {
-      for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
-        if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
-          try {
-            cleaner.cleanExpiredMobFiles(htd, hcd);
-          } catch (IOException e) {
-            LOG.error("Failed to clean the expired mob files table={} 
family={}",
-              htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
-          }
-        }
-      }
+      Future<?> future = executor.submit(() -> handleOneTable(htd));
+      futureList.add(future);
+    }
+
+    for (Future<?> future : futureList) {
       try {
-        // Now clean obsolete files for a table
-        LOG.info("Cleaning obsolete MOB files from table={}", 
htd.getTableName());
-        try (final Admin admin = master.getConnection().getAdmin()) {
-          
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), 
htd.getTableName(),
-            admin);
+        future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("MobFileCleanerChore interrupted while waiting for futures", 
e);
+        Thread.currentThread().interrupt();
+        cancelAllFutures(futureList);
+        break;
+      } catch (ExecutionException e) {
+        LOG.error("Exception during execution of MobFileCleanerChore task", e);
+      } catch (TimeoutException e) {
+        LOG.error("MobFileCleanerChore timed out waiting for a task to 
complete", e);
+      }
+    }
+  }
+
+  private void cancelAllFutures(List<Future<?>> futureList) {
+    long pendingTaskCounter = 0;
+    for (Future<?> f : futureList) {
+      if (!f.isDone()) {
+        f.cancel(true); // interrupt running tasks
+        pendingTaskCounter++;
+      }
+    }
+    LOG.info("Cancelled {} pending mob file cleaner tasks", 
pendingTaskCounter);
+  }
+
+  private void handleOneTable(TableDescriptor htd) {
+    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
+      if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
+        try {
+          cleaner.cleanExpiredMobFiles(htd, hcd);
+        } catch (IOException e) {
+          LOG.error("Failed to clean the expired mob files table={} family={}",
+            htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
         }
-        LOG.info("Cleaning obsolete MOB files finished for table={}", 
htd.getTableName());
-      } catch (IOException e) {
-        LOG.error("Failed to clean the obsolete mob files for table={}", 
htd.getTableName(), e);
       }
     }
+    try {
+      // Now clean obsolete files for a table
+      LOG.info("Cleaning obsolete MOB files from table={}", 
htd.getTableName());
+      try (final Admin admin = master.getConnection().getAdmin()) {
+        MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), 
htd.getTableName(),
+          admin);
+      }
+      LOG.info("Cleaning obsolete MOB files finished for table={}", 
htd.getTableName());
+    } catch (IOException e) {
+      LOG.error("Failed to clean the obsolete mob files for table={}", 
htd.getTableName(), e);
+    }
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
+      MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
+    if (newThreadCount < 1) {
+      return; // invalid value , skip the config change
+    }
+
+    if (newThreadCount != threadCount) {
+      resizeThreadPool(newThreadCount, newThreadCount);
+      threadCount = newThreadCount;
+    }
   }
 
+  private void resizeThreadPool(int newCoreSize, int newMaxSize) {
+    int currentCoreSize = executor.getCorePoolSize();
+    if (newCoreSize > currentCoreSize) {
+      // Increasing the pool size: Set max first, then core
+      executor.setMaximumPoolSize(newMaxSize);
+      executor.setCorePoolSize(newCoreSize);
+    } else {
+      // Decreasing the pool size: Set core first, then max
+      executor.setCorePoolSize(newCoreSize);
+      executor.setMaximumPoolSize(newMaxSize);
+    }
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public ThreadPoolExecutor getExecutor() {
+    return executor;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java
new file mode 100644
index 00000000000..3d9cced8935
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static 
org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UPPER_BOUND;
+import static 
org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, MasterTests.class })
+public class TestExpiredMobFileCleanerChore {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class);
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private final static TableName tableName = 
TableName.valueOf("TestExpiredMobFileCleaner");
+  private final static TableName tableName2 = 
TableName.valueOf("TestExpiredMobFileCleaner2");
+  private final static String family = "family";
+  private final static byte[] row1 = Bytes.toBytes("row1");
+  private final static byte[] row2 = Bytes.toBytes("row2");
+  private final static byte[] row3 = Bytes.toBytes("row3");
+  private final static byte[] qf = Bytes.toBytes("qf");
+
+  private static BufferedMutator table;
+  private static Admin admin;
+  private static BufferedMutator table2;
+  private static MobFileCleanerChore mobFileCleanerChore;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2);
+    TEST_UTIL.startMiniCluster(1);
+    mobFileCleanerChore = 
TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore();
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.disableTable(tableName2);
+    admin.deleteTable(tableName2);
+    admin.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
+  }
+
+  @Test
+  public void testCleanerSingleThread() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1);
+    mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration());
+    int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize();
+    Assert.assertEquals(1, corePoolSize);
+    testCleanerInternal();
+  }
+
+  @Test
+  public void testCleanerMultiThread() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2);
+    mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration());
+    int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize();
+    Assert.assertEquals(2, corePoolSize);
+    testCleanerInternal();
+  }
+
+  private static void init() throws Exception {
+    TableDescriptorBuilder tableDescriptorBuilder = 
TableDescriptorBuilder.newBuilder(tableName);
+    ColumnFamilyDescriptor columnFamilyDescriptor =
+      
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
+        .setMobThreshold(3L).setMaxVersions(4).build();
+    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
+
+    admin = TEST_UTIL.getAdmin();
+    admin.createTable(tableDescriptorBuilder.build());
+
+    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
+      .getBufferedMutator(tableName);
+
+    TableDescriptorBuilder tableDescriptorBuilder2 = 
TableDescriptorBuilder.newBuilder(tableName2);
+    ColumnFamilyDescriptor columnFamilyDescriptor2 =
+      
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
+        .setMobThreshold(3L).setMaxVersions(4).build();
+    tableDescriptorBuilder2.setColumnFamily(columnFamilyDescriptor2);
+    admin.createTable(tableDescriptorBuilder2.build());
+
+    table2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
+      .getBufferedMutator(tableName2);
+  }
+
+  private static void modifyColumnExpiryDays(int expireDays) throws Exception {
+
+    // change ttl as expire days to make some row expired
+    int timeToLive = expireDays * secondsOfDay();
+    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 
ColumnFamilyDescriptorBuilder
+      
.newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
+    columnFamilyDescriptorBuilder.setTimeToLive(timeToLive);
+
+    admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build());
+
+    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder2 = 
ColumnFamilyDescriptorBuilder
+      
.newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
+    columnFamilyDescriptorBuilder2.setTimeToLive(timeToLive);
+
+    admin.modifyColumnFamily(tableName2, 
columnFamilyDescriptorBuilder2.build());
+  }
+
+  private static void putKVAndFlush(BufferedMutator table, byte[] row, byte[] 
value, long ts,
+    TableName tableName) throws Exception {
+
+    Put put = new Put(row, ts);
+    put.addColumn(Bytes.toBytes(family), qf, value);
+    table.mutate(put);
+
+    table.flush();
+    admin.flush(tableName);
+  }
+
+  /**
+   * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 
days. Verifies that the
+   * 3 day old hfile is removed but the 1 day one is still present after the 
expiry based cleaner is
+   * run.
+   */
+  private static void testCleanerInternal() throws Exception {
+    init();
+
+    Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), 
tableName, family);
+
+    byte[] dummyData = makeDummyData(600);
+    long ts = EnvironmentEdgeManager.currentTime() - 3 * secondsOfDay() * 
1000; // 3 days before
+    putKVAndFlush(table, row1, dummyData, ts, tableName);
+    FileStatus[] firstFiles = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    // the first mob file
+    assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
+    String firstFile = firstFiles[0].getPath().getName();
+
+    // 1.5 day before
+    ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 
1000);
+    putKVAndFlush(table, row2, dummyData, ts, tableName);
+    FileStatus[] secondFiles = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    // now there are 2 mob files
+    assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
+    String f1 = secondFiles[0].getPath().getName();
+    String f2 = secondFiles[1].getPath().getName();
+    String secondFile = f1.equals(firstFile) ? f2 : f1;
+
+    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 
4 days before
+    putKVAndFlush(table, row3, dummyData, ts, tableName);
+    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 
4 days before
+    putKVAndFlush(table, row3, dummyData, ts, tableName);
+    FileStatus[] thirdFiles = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    // now there are 4 mob files
+    assertEquals("Before cleanup without delay 3", 4, thirdFiles.length);
+
+    // modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+    // for table 2
+    Path mobDirPath2 = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), 
tableName2, family);
+
+    byte[] dummyData2 = makeDummyData(600);
+
+    putKVAndFlush(table2, row1, dummyData2, ts, tableName2);
+    FileStatus[] firstFiles2 = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+    // the first mob file
+    assertEquals("Before cleanup without delay 1", 1, firstFiles2.length);
+    String firstFile2 = firstFiles2[0].getPath().getName();
+
+    // 1.5 day before
+    ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 
1000);
+    putKVAndFlush(table2, row2, dummyData2, ts, tableName2);
+    FileStatus[] secondFiles2 = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+    // now there are 2 mob files
+    assertEquals("Before cleanup without delay 2", 2, secondFiles2.length);
+    String f1Second = secondFiles2[0].getPath().getName();
+    String f2Second = secondFiles2[1].getPath().getName();
+    String secondFile2 = f1Second.equals(firstFile2) ? f2Second : f1Second;
+    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 
4 days before
+    putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
+    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 
4 days before
+    putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
+    FileStatus[] thirdFiles2 = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+    // now there are 4 mob files
+    assertEquals("Before cleanup without delay 3", 4, thirdFiles2.length);
+
+    modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+    // run the cleaner chore
+    mobFileCleanerChore.chore();
+
+    FileStatus[] filesAfterClean = 
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+    String lastFile = filesAfterClean[0].getPath().getName();
+    // there are 4 mob files in total, but only 3 need to be cleaned
+    assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+    assertEquals("After cleanup without delay 2", secondFile, lastFile);
+
+    filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+    lastFile = filesAfterClean[0].getPath().getName();
+    // there are 4 mob files in total, but only 3 need to be cleaned
+    assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+    assertEquals("After cleanup without delay 2", secondFile2, lastFile);
+  }
+
+  private static int secondsOfDay() {
+    return 24 * 3600;
+  }
+
+  private static byte[] makeDummyData(int size) {
+    byte[] dummyData = new byte[size];
+    Bytes.random(dummyData);
+    return dummyData;
+  }
+}

Reply via email to