This is an automated email from the ASF dual-hosted git repository.
chandra pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 163a1eb9a43 HBASE-27126 Support multi-threads cleaner for MOB files
(#7430)(#5833)
163a1eb9a43 is described below
commit 163a1eb9a43fb8a0511b79af0bf50a7c41813211
Author: Chandra Sekhar K <[email protected]>
AuthorDate: Sat Dec 13 11:39:54 2025 +0530
HBASE-27126 Support multi-threads cleaner for MOB files (#7430)(#5833)
Signed-off-by: Peng Lu <[email protected]>
(cherry picked from commit f800a13a3575be3d70537f40c3c21171ce4b2603)
---
.../org/apache/hadoop/hbase/master/HMaster.java | 6 +
.../org/apache/hadoop/hbase/mob/MobConstants.java | 6 +
.../hadoop/hbase/mob/MobFileCleanerChore.java | 137 +++++++++--
.../hbase/mob/TestExpiredMobFileCleanerChore.java | 255 +++++++++++++++++++++
4 files changed, 384 insertions(+), 20 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9e6e929eb10..9a2bac9870c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -4430,4 +4430,10 @@ public class HMaster extends HRegionServer implements
MasterServices {
}
});
}
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public MobFileCleanerChore getMobFileCleanerChore() {
+ return mobFileCleanerChore;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 51f287ce66e..052058a42ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -149,6 +149,12 @@ public final class MobConstants {
public static final String MOB_COMPACTION_THREADS_MAX =
"hbase.mob.compaction.threads.max";
public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;
+ public static final String MOB_CLEANER_THREAD_COUNT =
"hbase.master.mob.cleaner.threads";
+ public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1;
+ public static final String MOB_FILE_CLEANER_CHORE_TIME_OUT =
+ "hbase.master.mob.cleaner.chore.timeout";
+ public static final int DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT = 5 * 60; //
5 minutes
+
private MobConstants() {
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index 9ce20e7c650..595078230b3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -17,30 +17,49 @@
*/
package org.apache.hadoop.hbase.mob;
+import static
org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT;
+import static
org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT;
+
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* The class MobFileCleanerChore for running cleaner regularly to remove the
expired and obsolete
* (files which have no active references to) mob files.
*/
@InterfaceAudience.Private
-public class MobFileCleanerChore extends ScheduledChore {
+public class MobFileCleanerChore extends ScheduledChore implements
ConfigurationObserver {
private static final Logger LOG =
LoggerFactory.getLogger(MobFileCleanerChore.class);
+
private final HMaster master;
- private ExpiredMobFileCleaner cleaner;
+ private final ExpiredMobFileCleaner cleaner;
+ private final ThreadPoolExecutor executor;
+ private final int cleanerFutureTimeout;
+ private int threadCount;
static {
Configuration.addDeprecation(MobConstants.DEPRECATED_MOB_CLEANER_PERIOD,
@@ -57,7 +76,21 @@ public class MobFileCleanerChore extends ScheduledChore {
this.master = master;
cleaner = new ExpiredMobFileCleaner();
cleaner.setConf(master.getConfiguration());
+ threadCount =
master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
+ MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
+ if (threadCount < 1) {
+ threadCount = 1;
+ }
+
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build();
+
+ executor = new ThreadPoolExecutor(threadCount, threadCount, 60,
TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+
checkObsoleteConfigurations();
+ cleanerFutureTimeout =
master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT,
+ DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT);
}
private void checkObsoleteConfigurations() {
@@ -88,29 +121,93 @@ public class MobFileCleanerChore extends ScheduledChore {
LOG.error("MobFileCleanerChore failed", e);
return;
}
+ List<Future<?>> futureList = new ArrayList<>(map.size());
for (TableDescriptor htd : map.values()) {
- for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
- if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
- try {
- cleaner.cleanExpiredMobFiles(htd, hcd);
- } catch (IOException e) {
- LOG.error("Failed to clean the expired mob files table={}
family={}",
- htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
- }
- }
- }
+ Future<?> future = executor.submit(() -> handleOneTable(htd));
+ futureList.add(future);
+ }
+
+ for (Future<?> future : futureList) {
try {
- // Now clean obsolete files for a table
- LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
- try (final Admin admin = master.getConnection().getAdmin()) {
-
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(),
htd.getTableName(),
- admin);
+ future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("MobFileCleanerChore interrupted while waiting for futures",
e);
+ Thread.currentThread().interrupt();
+ cancelAllFutures(futureList);
+ break;
+ } catch (ExecutionException e) {
+ LOG.error("Exception during execution of MobFileCleanerChore task", e);
+ } catch (TimeoutException e) {
+ LOG.error("MobFileCleanerChore timed out waiting for a task to
complete", e);
+ }
+ }
+ }
+
+ private void cancelAllFutures(List<Future<?>> futureList) {
+ long pendingTaskCounter = 0;
+ for (Future<?> f : futureList) {
+ if (!f.isDone()) {
+ f.cancel(true); // interrupt running tasks
+ pendingTaskCounter++;
+ }
+ }
+ LOG.info("Cancelled {} pending mob file cleaner tasks",
pendingTaskCounter);
+ }
+
+ private void handleOneTable(TableDescriptor htd) {
+ for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
+ if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
+ try {
+ cleaner.cleanExpiredMobFiles(htd, hcd);
+ } catch (IOException e) {
+ LOG.error("Failed to clean the expired mob files table={} family={}",
+ htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
- LOG.info("Cleaning obsolete MOB files finished for table={}",
htd.getTableName());
- } catch (IOException e) {
- LOG.error("Failed to clean the obsolete mob files for table={}",
htd.getTableName(), e);
}
}
+ try {
+ // Now clean obsolete files for a table
+ LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
+ try (final Admin admin = master.getConnection().getAdmin()) {
+ MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(),
htd.getTableName(),
+ admin);
+ }
+ LOG.info("Cleaning obsolete MOB files finished for table={}",
htd.getTableName());
+ } catch (IOException e) {
+ LOG.error("Failed to clean the obsolete mob files for table={}",
htd.getTableName(), e);
+ }
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration conf) {
+ int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
+ MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
+ if (newThreadCount < 1) {
+ return; // invalid value , skip the config change
+ }
+
+ if (newThreadCount != threadCount) {
+ resizeThreadPool(newThreadCount, newThreadCount);
+ threadCount = newThreadCount;
+ }
}
+ private void resizeThreadPool(int newCoreSize, int newMaxSize) {
+ int currentCoreSize = executor.getCorePoolSize();
+ if (newCoreSize > currentCoreSize) {
+ // Increasing the pool size: Set max first, then core
+ executor.setMaximumPoolSize(newMaxSize);
+ executor.setCorePoolSize(newCoreSize);
+ } else {
+ // Decreasing the pool size: Set core first, then max
+ executor.setCorePoolSize(newCoreSize);
+ executor.setMaximumPoolSize(newMaxSize);
+ }
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public ThreadPoolExecutor getExecutor() {
+ return executor;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java
new file mode 100644
index 00000000000..3d9cced8935
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestExpiredMobFileCleanerChore.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static
org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UPPER_BOUND;
+import static
org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, MasterTests.class })
+public class TestExpiredMobFileCleanerChore {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class);
+ private final static HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private final static TableName tableName =
TableName.valueOf("TestExpiredMobFileCleaner");
+ private final static TableName tableName2 =
TableName.valueOf("TestExpiredMobFileCleaner2");
+ private final static String family = "family";
+ private final static byte[] row1 = Bytes.toBytes("row1");
+ private final static byte[] row2 = Bytes.toBytes("row2");
+ private final static byte[] row3 = Bytes.toBytes("row3");
+ private final static byte[] qf = Bytes.toBytes("qf");
+
+ private static BufferedMutator table;
+ private static Admin admin;
+ private static BufferedMutator table2;
+ private static MobFileCleanerChore mobFileCleanerChore;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2);
+ TEST_UTIL.startMiniCluster(1);
+ mobFileCleanerChore =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore();
+ }
+
+ @After
+ public void cleanUp() throws IOException {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ admin.disableTable(tableName2);
+ admin.deleteTable(tableName2);
+ admin.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
+ }
+
+ @Test
+ public void testCleanerSingleThread() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1);
+ mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration());
+ int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize();
+ Assert.assertEquals(1, corePoolSize);
+ testCleanerInternal();
+ }
+
+ @Test
+ public void testCleanerMultiThread() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2);
+ mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration());
+ int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize();
+ Assert.assertEquals(2, corePoolSize);
+ testCleanerInternal();
+ }
+
+ private static void init() throws Exception {
+ TableDescriptorBuilder tableDescriptorBuilder =
TableDescriptorBuilder.newBuilder(tableName);
+ ColumnFamilyDescriptor columnFamilyDescriptor =
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
+ .setMobThreshold(3L).setMaxVersions(4).build();
+ tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
+
+ admin = TEST_UTIL.getAdmin();
+ admin.createTable(tableDescriptorBuilder.build());
+
+ table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
+ .getBufferedMutator(tableName);
+
+ TableDescriptorBuilder tableDescriptorBuilder2 =
TableDescriptorBuilder.newBuilder(tableName2);
+ ColumnFamilyDescriptor columnFamilyDescriptor2 =
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
+ .setMobThreshold(3L).setMaxVersions(4).build();
+ tableDescriptorBuilder2.setColumnFamily(columnFamilyDescriptor2);
+ admin.createTable(tableDescriptorBuilder2.build());
+
+ table2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
+ .getBufferedMutator(tableName2);
+ }
+
+ private static void modifyColumnExpiryDays(int expireDays) throws Exception {
+
+ // change ttl as expire days to make some row expired
+ int timeToLive = expireDays * secondsOfDay();
+ ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
ColumnFamilyDescriptorBuilder
+
.newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
+ columnFamilyDescriptorBuilder.setTimeToLive(timeToLive);
+
+ admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build());
+
+ ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder2 =
ColumnFamilyDescriptorBuilder
+
.newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
+ columnFamilyDescriptorBuilder2.setTimeToLive(timeToLive);
+
+ admin.modifyColumnFamily(tableName2,
columnFamilyDescriptorBuilder2.build());
+ }
+
+ private static void putKVAndFlush(BufferedMutator table, byte[] row, byte[]
value, long ts,
+ TableName tableName) throws Exception {
+
+ Put put = new Put(row, ts);
+ put.addColumn(Bytes.toBytes(family), qf, value);
+ table.mutate(put);
+
+ table.flush();
+ admin.flush(tableName);
+ }
+
+ /**
+ * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2
days. Verifies that the
+ * 3 day old hfile is removed but the 1 day one is still present after the
expiry based cleaner is
+ * run.
+ */
+ private static void testCleanerInternal() throws Exception {
+ init();
+
+ Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
tableName, family);
+
+ byte[] dummyData = makeDummyData(600);
+ long ts = EnvironmentEdgeManager.currentTime() - 3 * secondsOfDay() *
1000; // 3 days before
+ putKVAndFlush(table, row1, dummyData, ts, tableName);
+ FileStatus[] firstFiles =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ // the first mob file
+ assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
+ String firstFile = firstFiles[0].getPath().getName();
+
+ // 1.5 day before
+ ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() *
1000);
+ putKVAndFlush(table, row2, dummyData, ts, tableName);
+ FileStatus[] secondFiles =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ // now there are 2 mob files
+ assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
+ String f1 = secondFiles[0].getPath().getName();
+ String f2 = secondFiles[1].getPath().getName();
+ String secondFile = f1.equals(firstFile) ? f2 : f1;
+
+ ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; //
4 days before
+ putKVAndFlush(table, row3, dummyData, ts, tableName);
+ ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; //
4 days before
+ putKVAndFlush(table, row3, dummyData, ts, tableName);
+ FileStatus[] thirdFiles =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ // now there are 4 mob files
+ assertEquals("Before cleanup without delay 3", 4, thirdFiles.length);
+
+ // modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+ // for table 2
+ Path mobDirPath2 = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
tableName2, family);
+
+ byte[] dummyData2 = makeDummyData(600);
+
+ putKVAndFlush(table2, row1, dummyData2, ts, tableName2);
+ FileStatus[] firstFiles2 =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+ // the first mob file
+ assertEquals("Before cleanup without delay 1", 1, firstFiles2.length);
+ String firstFile2 = firstFiles2[0].getPath().getName();
+
+ // 1.5 day before
+ ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() *
1000);
+ putKVAndFlush(table2, row2, dummyData2, ts, tableName2);
+ FileStatus[] secondFiles2 =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+ // now there are 2 mob files
+ assertEquals("Before cleanup without delay 2", 2, secondFiles2.length);
+ String f1Second = secondFiles2[0].getPath().getName();
+ String f2Second = secondFiles2[1].getPath().getName();
+ String secondFile2 = f1Second.equals(firstFile2) ? f2Second : f1Second;
+ ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; //
4 days before
+ putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
+ ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; //
4 days before
+ putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
+ FileStatus[] thirdFiles2 =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+ // now there are 4 mob files
+ assertEquals("Before cleanup without delay 3", 4, thirdFiles2.length);
+
+ modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
+
+ // run the cleaner chore
+ mobFileCleanerChore.chore();
+
+ FileStatus[] filesAfterClean =
TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
+ String lastFile = filesAfterClean[0].getPath().getName();
+ // there are 4 mob files in total, but only 3 need to be cleaned
+ assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+ assertEquals("After cleanup without delay 2", secondFile, lastFile);
+
+ filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
+ lastFile = filesAfterClean[0].getPath().getName();
+ // there are 4 mob files in total, but only 3 need to be cleaned
+ assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
+ assertEquals("After cleanup without delay 2", secondFile2, lastFile);
+ }
+
+ private static int secondsOfDay() {
+ return 24 * 3600;
+ }
+
+ private static byte[] makeDummyData(int size) {
+ byte[] dummyData = new byte[size];
+ Bytes.random(dummyData);
+ return dummyData;
+ }
+}