vinayakphegde commented on code in PR #6518:
URL: https://github.com/apache/hbase/pull/6518#discussion_r1893504562


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagingManager.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static 
org.apache.hadoop.hbase.master.cleaner.HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.*;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the staging and backup of Write-Ahead Logs (WALs) and bulk-loaded 
files as part of the
+ * continuous backup process in HBase. This class ensures that WALs are 
staged, flushed, and backed
+ * up safely to support backup and recovery workflows.
+ */
[email protected]
+public class ContinuousBackupStagingManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousBackupStagingManager.class);
+
+  public static final String WALS_BACKUP_STAGING_DIR = "wal-backup-staging";
+  public static final String CONF_STAGED_WAL_FLUSH_INITIAL_DELAY =
+    "hbase.backup.staged.wal.flush.initial.delay.seconds";
+  public static final int DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS = 5 * 
60; // 5 minutes
+  public static final String CONF_STAGED_WAL_FLUSH_INTERVAL =
+    "hbase.backup.staged.wal.flush.interval.seconds";
+  public static final int DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS = 5 * 60; 
// 5 minutes
+  public static final int EXECUTOR_TERMINATION_TIMEOUT_SECONDS = 60; // TODO: 
configurable??
+  public static final String CONF_STATED_FILES_BACKUP_THREADS =
+    "hbase.backup.staged.files.backup.threads";
+  public static final int DEFAULT_STATED_FILES_BACKUP_THREADS = 3;
+
+  private final Configuration conf;
+  private final FileSystem walStagingFs;
+  private final Path walStagingDir;
+  private final ConcurrentHashMap<Path, ContinuousBackupWalWriter> 
walWriterMap =
+    new ConcurrentHashMap<>();
+  private final ContinuousBackupManager continuousBackupManager;
+  private ScheduledExecutorService flushExecutor;
+  private ExecutorService backupExecutor;
+  private final Set<Path> filesCurrentlyBeingBackedUp = 
ConcurrentHashMap.newKeySet();
+  private final ReentrantLock lock = new ReentrantLock();
+  private final StagedBulkloadFileRegistry stagedBulkloadFileRegistry;
+
+  /**
+   * Constructs a ContinuousBackupStagingManager with the specified 
configuration and backup
+   * manager.
+   * @param conf                    the HBase configuration
+   * @param continuousBackupManager the backup manager for continuous backup
+   * @throws IOException if there is an error initializing the WAL staging 
directory or related
+   *                     resources
+   */
+  public ContinuousBackupStagingManager(Configuration conf,
+    ContinuousBackupManager continuousBackupManager) throws IOException {
+    this.conf = conf;
+    this.continuousBackupManager = continuousBackupManager;
+    // TODO: configurable??
+    this.walStagingFs = CommonFSUtils.getRootDirFileSystem(conf);
+    this.walStagingDir = new Path(CommonFSUtils.getRootDir(conf),
+      new Path(WALS_BACKUP_STAGING_DIR, continuousBackupManager.getPeerId()));
+
+    ensureHFileCleanerPluginConfigured();
+    initWalStagingDir();
+    startWalFlushExecutor();
+    startBackupExecutor();
+
+    Connection conn = ConnectionFactory.createConnection(conf);
+    this.stagedBulkloadFileRegistry =
+      new StagedBulkloadFileRegistry(conn, 
continuousBackupManager.getPeerId());
+  }
+
+  private void ensureHFileCleanerPluginConfigured() throws IOException {
+    String plugins = conf.get(MASTER_HFILE_CLEANER_PLUGINS);
+    String cleanerClass = 
ContinuousBackupStagedHFileCleaner.class.getCanonicalName();
+    if (plugins == null || !plugins.contains(cleanerClass)) {
+      String errorMsg = Utils.logPeerId(continuousBackupManager.getPeerId())
+        + " Continuous Backup Bulk-loaded HFile's Cleaner plugin is invalid or 
missing: "
+        + cleanerClass;
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  private void initWalStagingDir() throws IOException {
+    if (walStagingFs.exists(walStagingDir)) {
+      LOG.debug("{} WALs staging directory already exists: {}",
+        Utils.logPeerId(continuousBackupManager.getPeerId()), walStagingDir);
+    } else {
+      walStagingFs.mkdirs(walStagingDir);
+      LOG.debug("{} WALs staging directory created: {}",
+        Utils.logPeerId(continuousBackupManager.getPeerId()), walStagingDir);
+    }
+  }
+
+  private void startWalFlushExecutor() {
+    int initialDelay = conf.getInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY,
+      DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS);
+    int flushInterval =
+      conf.getInt(CONF_STAGED_WAL_FLUSH_INTERVAL, 
DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS);
+
+    flushExecutor = Executors.newSingleThreadScheduledExecutor();
+    flushExecutor.scheduleAtFixedRate(this::flushAndBackupSafely, 
initialDelay, flushInterval,
+      TimeUnit.SECONDS);
+  }
+
+  private void flushAndBackupSafely() {
+    try {
+      LOG.info("{} Periodic WAL flush triggered...",
+        Utils.logPeerId(continuousBackupManager.getPeerId()));
+      flushWalFiles();
+      backupWalFiles();
+    } catch (IOException e) {
+      LOG.error("{} Error during periodic WAL flush: {}",
+        Utils.logPeerId(continuousBackupManager.getPeerId()), e.getMessage(), 
e);
+    }
+  }
+
+  private void flushWalFiles() {
+    lock.lock();
+    try {
+      for (Map.Entry<Path, ContinuousBackupWalWriter> entry : 
walWriterMap.entrySet()) {
+        flushWalData(entry.getKey(), entry.getValue());
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void flushWalData(Path walDir, ContinuousBackupWalWriter writer) {
+    if (writer.hasAnyEntry()) {
+      LOG.debug("{} Flushing WAL data for {}", 
Utils.logPeerId(continuousBackupManager.getPeerId()),
+        walDir);
+      closeWriter(writer);
+      walWriterMap.put(walDir, createNewContinuousBackupWalWriter(walDir));
+    } else {
+      LOG.debug("{} No WAL data to flush for {}",
+        Utils.logPeerId(continuousBackupManager.getPeerId()), walDir);
+    }
+  }
+
+  private void closeWriter(ContinuousBackupWalWriter writer) {
+    try {
+      writer.close();
+    } catch (IOException e) {
+      LOG.error("{} Error occurred while closing WAL writer: ",
+        Utils.logPeerId(continuousBackupManager.getPeerId()), e);
+    }
+  }
+
+  private ContinuousBackupWalWriter createNewContinuousBackupWalWriter(Path 
walDir) {
+    try {
+      return new ContinuousBackupWalWriter(walStagingFs, walStagingDir, 
walDir, conf);
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to create WAL Writer for " + 
walDir, e);
+    }
+  }
+
+  private void startBackupExecutor() {
+    int threads =
+      conf.getInt(CONF_STATED_FILES_BACKUP_THREADS, 
DEFAULT_STATED_FILES_BACKUP_THREADS);
+    backupExecutor = Executors.newFixedThreadPool(threads);
+  }
+
+  /**
+   * Stages WAL entries and bulk-load files for a specific table. Ensures that 
WAL entries are
+   * written and bulk-loaded files are registered to prevent deletion by the 
HFileCleaner thread.
+   * @param tableName     the name of the table
+   * @param walEntries    the list of WAL entries to stage
+   * @param bulkLoadFiles the list of bulk-load files to stage
+   * @throws IOException if there is an error staging the entries or files
+   */
+  public void stageEntries(TableName tableName, List<WAL.Entry> walEntries,
+    List<Path> bulkLoadFiles) throws IOException {
+    lock.lock();
+    try {
+      String namespace = tableName.getNamespaceAsString();
+      String table = tableName.getQualifierAsString();
+
+      Path walDir = WALUtils.getWalDir(namespace, table);
+      ContinuousBackupWalWriter continuousBackupWalWriter = 
getContinuousBackupWalWriter(walDir);
+
+      continuousBackupWalWriter.write(walEntries, bulkLoadFiles);
+
+      // prevent bulk-loaded files from deleting HFileCleaner thread
+      stagedBulkloadFileRegistry.addStagedFiles(bulkLoadFiles);

Review Comment:
   No, that scenario doesn't occur. The replication framework prevents the 
deletion of files using `ReplicationHFileCleaner`. Files are only deleted once 
the WALs and bulkload files are successfully replicated, as indicated by a 
success return from the replicate() method.
   So, it the replicate() method is complete, it won't delete the file.
   
   However, in our case, the bulkloaded files are still in the staging area and 
haven’t been copied yet. Therefore, we had to implement an additional cleaner 
for this purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to