chandrasekhar-188k commented on code in PR #5833:
URL: https://github.com/apache/hbase/pull/5833#discussion_r2418402456
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java:
##########
@@ -52,7 +71,21 @@ public MobFileCleanerChore(HMaster master) {
this.master = master;
cleaner = new ExpiredMobFileCleaner();
cleaner.setConf(master.getConfiguration());
+ threadCount =
master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
+ MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
+ if (threadCount <= 1) {
Review Comment:
updated
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java:
##########
@@ -83,29 +116,91 @@ protected void chore() {
LOG.error("MobFileCleanerChore failed", e);
return;
}
+ List<Future<?>> futureList = new ArrayList<>(map.size());
for (TableDescriptor htd : map.values()) {
- for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
- if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
- try {
- cleaner.cleanExpiredMobFiles(htd, hcd);
- } catch (IOException e) {
- LOG.error("Failed to clean the expired mob files table={}
family={}",
- htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
- }
- }
- }
+ Future<?> future = executor.submit(() -> handleOneTable(htd));
+ futureList.add(future);
+ }
+
+ for (Future<?> future : futureList) {
try {
- // Now clean obsolete files for a table
- LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
- try (final Admin admin = master.getConnection().getAdmin()) {
-
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(),
htd.getTableName(),
- admin);
+ future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("MobFileCleanerChore interrupted while waiting for futures",
e);
+ Thread.currentThread().interrupt();
+ cancelAllFutures(futureList);
+ break;
+ } catch (ExecutionException e) {
+ LOG.error("Exception during execution of MobFileCleanerChore task", e);
+ } catch (TimeoutException e) {
+ LOG.error("MobFileCleanerChore timed out waiting for a task to
complete", e);
+ }
+ }
+ }
+
+ private void cancelAllFutures(List<Future<?>> futureList) {
+ for (Future<?> f : futureList) {
+ if (!f.isDone()) {
+ f.cancel(true); // interrupt running tasks
+ }
+ }
+ LOG.info("Cancelled all pending mob file cleaner tasks");
Review Comment:
updated the log message
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]