This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 132e12a6311b5d74c8b7c3e1e1753a866da59bd0
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Fri Jun 27 23:53:52 2025 +0530

    HBASE-29406: Skip Copying Bulkloaded Files to Backup Location in Continuous 
Backup (#7119)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com>
---
 .../hadoop/hbase/backup/impl/BackupCommands.java   |  14 +--
 .../hbase/backup/impl/FullTableBackupClient.java   |   8 --
 .../replication/BackupFileSystemManager.java       |  11 +-
 .../backup/replication/BulkLoadProcessor.java      |  96 -----------------
 .../ContinuousBackupReplicationEndpoint.java       |  93 +---------------
 .../hbase/backup/TestBackupDeleteWithCleanup.java  |  19 +---
 .../hbase/backup/impl/TestBackupCommands.java      |   7 +-
 .../TestContinuousBackupReplicationEndpoint.java   | 117 ++-------------------
 8 files changed, 18 insertions(+), 347 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 2020b84bc1c..3ae97c487ef 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -1004,7 +1004,6 @@ public final class BackupCommands {
           new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, 
conf, backupWalDir);
         FileSystem fs = manager.getBackupFs();
         Path walDir = manager.getWalsDir();
-        Path bulkloadDir = manager.getBulkLoadFilesDir();
 
         // Delete contents under WAL directory
         if (fs.exists(walDir)) {
@@ -1015,15 +1014,6 @@ public final class BackupCommands {
           System.out.println("Deleted all contents under WAL directory: " + 
walDir);
         }
 
-        // Delete contents under bulk load directory
-        if (fs.exists(bulkloadDir)) {
-          FileStatus[] bulkContents = fs.listStatus(bulkloadDir);
-          for (FileStatus item : bulkContents) {
-            fs.delete(item.getPath(), true); // recursive delete of each child
-          }
-          System.out.println("Deleted all contents under Bulk Load directory: 
" + bulkloadDir);
-        }
-
       } catch (IOException e) {
         System.out.println("WARNING: Failed to delete contents under backup 
directories: "
           + backupWalDir + ". Error: " + e.getMessage());
@@ -1032,7 +1022,7 @@ public final class BackupCommands {
     }
 
     /**
-     * Cleans up old WAL and bulk-loaded files based on the determined cutoff 
timestamp.
+     * Cleans up old WAL files based on the determined cutoff timestamp.
      */
     void deleteOldWALFiles(Configuration conf, String backupWalDir, long 
cutoffTime)
       throws IOException {
@@ -1043,7 +1033,6 @@ public final class BackupCommands {
         new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, 
backupWalDir);
       FileSystem fs = manager.getBackupFs();
       Path walDir = manager.getWalsDir();
-      Path bulkloadDir = manager.getBulkLoadFilesDir();
 
       SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
       dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -1069,7 +1058,6 @@ public final class BackupCommands {
           if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
             System.out.println("Deleting outdated WAL directory: " + dirPath);
             fs.delete(dirPath, true);
-            fs.delete(new Path(bulkloadDir, dirName), true);
           }
         } catch (ParseException e) {
           System.out.println("WARNING: Failed to parse directory name '" + 
dirName
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 3f6ae3deb63..5b49496d626 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -190,14 +190,6 @@ public class FullTableBackupClient extends 
TableBackupClient {
     // set overall backup status: complete. Here we make sure to complete the 
backup.
     // After this checkpoint, even if entering cancel process, will let the 
backup finished
     backupInfo.setState(BackupState.COMPLETE);
-
-    if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) {
-      System.out.println("NOTE: Bulkload replication is not enabled. "
-        + "Bulk loaded files will not be backed up as part of continuous 
backup. "
-        + "To ensure bulk loaded files are included in the backup, please 
enable bulkload replication "
-        + "(hbase.replication.bulkload.enabled=true) and configure other 
necessary settings "
-        + "to properly enable bulkload replication.");
-    }
   }
 
   private void handleNonContinuousBackup(Admin admin) throws IOException {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
index 225d3217276..9d1d818c207 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
@@ -26,20 +26,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) and
- * bulk-loaded files within the specified backup root directory.
+ * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) files within
+ * the specified backup root directory.
  */
 @InterfaceAudience.Private
 public class BackupFileSystemManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(BackupFileSystemManager.class);
 
   public static final String WALS_DIR = "WALs";
-  public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
   private final String peerId;
   private final FileSystem backupFs;
   private final Path backupRootDir;
   private final Path walsDir;
-  private final Path bulkLoadFilesDir;
 
   public BackupFileSystemManager(String peerId, Configuration conf, String 
backupRootDirStr)
     throws IOException {
@@ -47,7 +45,6 @@ public class BackupFileSystemManager {
     this.backupRootDir = new Path(backupRootDirStr);
     this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
     this.walsDir = createDirectory(WALS_DIR);
-    this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
   }
 
   private Path createDirectory(String dirName) throws IOException {
@@ -61,10 +58,6 @@ public class BackupFileSystemManager {
     return walsDir;
   }
 
-  public Path getBulkLoadFilesDir() {
-    return bulkLoadFilesDir;
-  }
-
   public FileSystem getBackupFs() {
     return backupFs;
   }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
deleted file mode 100644
index 6e1271313bc..00000000000
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
-/**
- * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase 
replication.
- * <p>
- * This utility class extracts and constructs the file paths of bulk-loaded 
files based on WAL
- * entries. It processes bulk load descriptors and their associated store 
descriptors to generate
- * the paths for each bulk-loaded file.
- * <p>
- * The class is designed for scenarios where replicable bulk load operations 
need to be parsed and
- * their file paths need to be determined programmatically.
- * </p>
- */
-@InterfaceAudience.Private
-public final class BulkLoadProcessor {
-  private BulkLoadProcessor() {
-  }
-
-  public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) 
throws IOException {
-    List<Path> bulkLoadFilePaths = new ArrayList<>();
-
-    for (WAL.Entry entry : walEntries) {
-      WALEdit edit = entry.getEdit();
-      for (Cell cell : edit.getCells()) {
-        if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-          TableName tableName = entry.getKey().getTableName();
-          String namespace = tableName.getNamespaceAsString();
-          String table = tableName.getQualifierAsString();
-          bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, 
table));
-        }
-      }
-    }
-    return bulkLoadFilePaths;
-  }
-
-  private static List<Path> processBulkLoadDescriptor(Cell cell, String 
namespace, String table)
-    throws IOException {
-    List<Path> bulkLoadFilePaths = new ArrayList<>();
-    WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-
-    if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == 
null) {
-      return bulkLoadFilePaths; // Skip if not replicable
-    }
-
-    String regionName = bld.getEncodedRegionName().toStringUtf8();
-    for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
-      bulkLoadFilePaths
-        .addAll(processStoreDescriptor(storeDescriptor, namespace, table, 
regionName));
-    }
-
-    return bulkLoadFilePaths;
-  }
-
-  private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor 
storeDescriptor,
-    String namespace, String table, String regionName) {
-    List<Path> paths = new ArrayList<>();
-    String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
-
-    for (String storeFile : storeDescriptor.getStoreFileList()) {
-      paths.add(new Path(namespace,
-        new Path(table, new Path(regionName, new Path(columnFamily, 
storeFile)))));
-    }
-
-    return paths;
-  }
-}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index eeacc8fbf34..bf3fbd531bf 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -33,10 +33,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationResult;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -56,8 +53,8 @@ import org.slf4j.LoggerFactory;
 /**
  * ContinuousBackupReplicationEndpoint is responsible for replicating WAL 
entries to a backup
  * storage. It organizes WAL entries by day and periodically flushes the data, 
ensuring that WAL
- * files do not exceed the configured size. The class includes mechanisms for 
handling the WAL
- * files, performing bulk load backups, and ensuring that the replication 
process is safe.
+ * files do not exceed the configured size. The class includes mechanisms for 
handling the WAL files
+ * and ensuring that the replication process is safe.
  */
 @InterfaceAudience.Private
 public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint {
@@ -292,20 +289,11 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
 
     try {
       FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, 
this::createWalWriter);
-      List<Path> bulkLoadFiles = 
BulkLoadProcessor.processBulkLoadFiles(walEntries);
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("{} Processed {} bulk load files for WAL entries", 
Utils.logPeerId(peerId),
-          bulkLoadFiles.size());
-        LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
-          
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
-      }
 
       for (WAL.Entry entry : walEntries) {
         walWriter.append(entry);
       }
       walWriter.sync(true);
-      uploadBulkLoadFiles(day, bulkLoadFiles);
     } catch (UncheckedIOException e) {
       String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create 
WAL Writer for " + day;
       LOG.error("{} Backup failed for day {}. Error: {}", 
Utils.logPeerId(peerId), day,
@@ -375,41 +363,6 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     }
   }
 
-  private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) 
throws IOException {
-    LOG.debug("{} Starting upload of {} bulk load files", 
Utils.logPeerId(peerId),
-      bulkLoadFiles.size());
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
-        
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
-    }
-    String dayDirectoryName = formatToDateString(dayInMillis);
-    Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
dayDirectoryName);
-    backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
-
-    for (Path file : bulkLoadFiles) {
-      Path sourcePath = getBulkLoadFileStagingPath(file);
-      Path destPath = new Path(bulkloadDir, file);
-
-      try {
-        LOG.debug("{} Copying bulk load file from {} to {}", 
Utils.logPeerId(peerId), sourcePath,
-          destPath);
-
-        FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath,
-          backupFileSystemManager.getBackupFs(), destPath, false, conf);
-
-        LOG.info("{} Bulk load file {} successfully backed up to {}", 
Utils.logPeerId(peerId), file,
-          destPath);
-      } catch (IOException e) {
-        LOG.error("{} Failed to back up bulk load file {}: {}", 
Utils.logPeerId(peerId), file,
-          e.getMessage(), e);
-        throw e;
-      }
-    }
-
-    LOG.debug("{} Completed upload of bulk load files", 
Utils.logPeerId(peerId));
-  }
-
   /**
    * Convert dayInMillis to "yyyy-MM-dd" format
    */
@@ -419,48 +372,6 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     return dateFormat.format(new Date(dayInMillis));
   }
 
-  private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) 
throws IOException {
-    FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
-    Path rootDir = CommonFSUtils.getRootDir(conf);
-    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
-    Path baseNamespaceDir = new Path(rootDir, baseNSDir);
-    Path hFileArchiveDir =
-      new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, 
baseNSDir));
-
-    LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", 
Utils.logPeerId(peerId),
-      relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir);
-
-    Path result =
-      findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, 
relativePathFromNamespace);
-
-    if (result == null) {
-      LOG.error("{} No bulk loaded file found in relative path: {}", 
Utils.logPeerId(peerId),
-        relativePathFromNamespace);
-      throw new IOException(
-        "No Bulk loaded file found in relative path: " + 
relativePathFromNamespace);
-    }
-
-    LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), 
result);
-    return result;
-  }
-
-  private static Path findExistingPath(FileSystem rootFs, Path 
baseNamespaceDir,
-    Path hFileArchiveDir, Path filePath) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Checking for bulk load file at: {} and {}", new 
Path(baseNamespaceDir, filePath),
-        new Path(hFileArchiveDir, filePath));
-    }
-
-    for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
-      new Path(hFileArchiveDir, filePath) }) {
-      if (rootFs.exists(candidate)) {
-        LOG.debug("Found bulk load file at: {}", candidate);
-        return candidate;
-      }
-    }
-    return null;
-  }
-
   private void shutdownFlushExecutor() {
     if (flushExecutor != null) {
       LOG.info("{} Initiating WAL flush executor shutdown.", 
Utils.logPeerId(peerId));
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
index 07c9110072b..d22f4c9cda9 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.backup;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
@@ -165,7 +164,7 @@ public class TestBackupDeleteWithCleanup extends 
TestBackupBase {
 
     // Step 6: Verify that the backup WAL directory is empty
     assertTrue("WAL backup directory should be empty after force delete",
-      areWalAndBulkloadDirsEmpty(conf1, backupWalDir.toString()));
+      isWalDirsEmpty(conf1, backupWalDir.toString()));
 
     // Step 7: Take new full backup with continuous backup enabled
     String backupIdContinuous = 
fullTableBackupWithContinuous(Lists.newArrayList(table1));
@@ -190,35 +189,28 @@ public class TestBackupDeleteWithCleanup extends 
TestBackupBase {
   public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long 
currentTime)
     throws IOException {
     Path walsDir = new Path(backupWalDir, WALS_DIR);
-    Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
 
     fs.mkdirs(walsDir);
-    fs.mkdirs(bulkLoadDir);
 
     SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
 
     for (int i = 0; i < 5; i++) {
       String dateStr = dateFormat.format(new Date(currentTime - (i * 
ONE_DAY_IN_MILLISECONDS)));
       fs.mkdirs(new Path(walsDir, dateStr));
-      fs.mkdirs(new Path(bulkLoadDir, dateStr));
     }
   }
 
   private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir, 
long currentTime)
     throws IOException {
     Path walsDir = new Path(backupWalDir, WALS_DIR);
-    Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
     SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
 
     // Expect folders older than 3 days to be deleted
     for (int i = 3; i < 5; i++) {
       String oldDateStr = dateFormat.format(new Date(currentTime - (i * 
ONE_DAY_IN_MILLISECONDS)));
       Path walPath = new Path(walsDir, oldDateStr);
-      Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr);
       assertFalse("Old WAL directory (" + walPath + ") should be deleted, but 
it exists!",
         fs.exists(walPath));
-      assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be 
deleted, but it exists!",
-        fs.exists(bulkLoadPath));
     }
 
     // Expect folders within the last 3 days to exist
@@ -226,13 +218,9 @@ public class TestBackupDeleteWithCleanup extends 
TestBackupBase {
       String recentDateStr =
         dateFormat.format(new Date(currentTime - (i * 
ONE_DAY_IN_MILLISECONDS)));
       Path walPath = new Path(walsDir, recentDateStr);
-      Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr);
 
       assertTrue("Recent WAL directory (" + walPath + ") should exist, but it 
is missing!",
         fs.exists(walPath));
-      assertTrue(
-        "Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it 
is missing!",
-        fs.exists(bulkLoadPath));
     }
   }
 
@@ -276,16 +264,15 @@ public class TestBackupDeleteWithCleanup extends 
TestBackupBase {
       peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && 
peer.isEnabled());
   }
 
-  private static boolean areWalAndBulkloadDirsEmpty(Configuration conf, String 
backupWalDir)
+  private static boolean isWalDirsEmpty(Configuration conf, String 
backupWalDir)
     throws IOException {
     BackupFileSystemManager manager =
       new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, 
backupWalDir);
 
     FileSystem fs = manager.getBackupFs();
     Path walDir = manager.getWalsDir();
-    Path bulkloadDir = manager.getBulkLoadFilesDir();
 
-    return isDirectoryEmpty(fs, walDir) && isDirectoryEmpty(fs, bulkloadDir);
+    return isDirectoryEmpty(fs, walDir);
   }
 
   private static boolean isDirectoryEmpty(FileSystem fs, Path dirPath) throws 
IOException {
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
index b2ebbd640bb..e00ebd6099f 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup.impl;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
 import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
@@ -135,7 +134,7 @@ public class TestBackupCommands extends TestBackupBase {
     fs.mkdirs(backupWalDir);
 
     long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
-    setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of 
WAL/bulk folders
+    setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of 
WALs folders
 
     logDirectoryStructure(fs, backupWalDir, "Before cleanup:");
 
@@ -155,7 +154,6 @@ public class TestBackupCommands extends TestBackupBase {
   private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir, 
long currentTime,
     long cutoffTime) throws IOException {
     Path walsDir = new Path(backupWalDir, WALS_DIR);
-    Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
     SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
 
@@ -163,14 +161,11 @@ public class TestBackupCommands extends TestBackupBase {
       long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS);
       String dayDir = dateFormat.format(new Date(dayTime));
       Path walPath = new Path(walsDir, dayDir);
-      Path bulkPath = new Path(bulkLoadDir, dayDir);
 
       if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
         assertFalse("Old WAL dir should be deleted: " + walPath, 
fs.exists(walPath));
-        assertFalse("Old BulkLoad dir should be deleted: " + bulkPath, 
fs.exists(bulkPath));
       } else {
         assertTrue("Recent WAL dir should exist: " + walPath, 
fs.exists(walPath));
-        assertTrue("Recent BulkLoad dir should exist: " + bulkPath, 
fs.exists(bulkPath));
       }
     }
   }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index 253675f85d9..3919746d3b7 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -17,9 +17,7 @@
  */
 package org.apache.hadoop.hbase.backup.replication;
 
-import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
@@ -66,11 +64,8 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
-import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
@@ -96,14 +91,12 @@ public class TestContinuousBackupReplicationEndpoint {
 
   private final String replicationEndpoint = 
ContinuousBackupReplicationEndpoint.class.getName();
   private static final String CF_NAME = "cf";
-  private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier");
   static FileSystem fs = null;
   static Path root;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     // Set the configuration properties as required
-    conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
     conf.set(REPLICATION_CLUSTER_ID, "clusterId1");
 
     TEST_UTIL.startMiniZKCluster();
@@ -122,7 +115,7 @@ public class TestContinuousBackupReplicationEndpoint {
   }
 
   @Test
-  public void testWALAndBulkLoadFileBackup() throws IOException {
+  public void testWALBackup() throws IOException {
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
     TableName tableName = TableName.valueOf("table_" + methodName);
     String peerId = "peerId";
@@ -140,15 +133,10 @@ public class TestContinuousBackupReplicationEndpoint {
     loadRandomData(tableName, 100);
     assertEquals(100, getRowCount(tableName));
 
-    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
-    generateHFiles(dir);
-    bulkLoadHFiles(tableName, dir);
-    assertEquals(1100, getRowCount(tableName));
-
     waitForReplication(15000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
+    verifyBackup(backupRootDir.toString(), Map.of(tableName, 100));
 
     deleteTable(tableName);
   }
@@ -196,7 +184,7 @@ public class TestContinuousBackupReplicationEndpoint {
     waitForReplication(15000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 
100, table3, 50));
+    verifyBackup(backupRootDir.toString(), Map.of(table1, 100, table2, 100, 
table3, 50));
 
     for (TableName table : List.of(table1, table2, table3)) {
       deleteTable(table);
@@ -254,7 +242,7 @@ public class TestContinuousBackupReplicationEndpoint {
     waitForReplication(20000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 
getRowCount(tableName)));
+    verifyBackup(backupRootDir.toString(), Map.of(tableName, 
getRowCount(tableName)));
 
     deleteTable(tableName);
   }
@@ -301,7 +289,7 @@ public class TestContinuousBackupReplicationEndpoint {
     waitForReplication(15000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200));
+    verifyBackup(backupRootDir.toString(), Map.of(tableName, 200));
 
     // Verify that WALs are stored in two directories, one for each day
     Path walDir = new Path(backupRootDir, WALS_DIR);
@@ -370,42 +358,6 @@ public class TestContinuousBackupReplicationEndpoint {
     }
   }
 
-  private void bulkLoadHFiles(TableName tableName, Path inputDir) throws 
IOException {
-    
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 true);
-
-    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
-      BulkLoadHFiles loader = new 
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
-      loader.bulkLoad(table.getName(), inputDir);
-    } finally {
-      
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 false);
-    }
-  }
-
-  private void bulkLoadHFiles(TableName tableName, Map<byte[], List<Path>> 
family2Files)
-    throws IOException {
-    
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 true);
-
-    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
-      BulkLoadHFiles loader = new 
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
-      loader.bulkLoad(table.getName(), family2Files);
-    } finally {
-      
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 false);
-    }
-  }
-
-  private void generateHFiles(Path outputDir) throws IOException {
-    String hFileName = "MyHFile";
-    int numRows = 1000;
-    outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
-
-    byte[] from = Bytes.toBytes(CF_NAME + "begin");
-    byte[] to = Bytes.toBytes(CF_NAME + "end");
-
-    Path familyDir = new Path(outputDir, CF_NAME);
-    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new 
Path(familyDir, hFileName),
-      Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows);
-  }
-
   private void waitForReplication(int durationInMillis) {
     LOG.info("Waiting for replication to complete for {} ms", 
durationInMillis);
     try {
@@ -418,17 +370,12 @@ public class TestContinuousBackupReplicationEndpoint {
 
   /**
    * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead 
Log) files were
-   * generated in the backup directory. 2. Checking whether any bulk-loaded 
files were generated in
-   * the backup directory. 3. Replaying the WAL and bulk-loaded files (if 
present) to restore data
-   * and check consistency by verifying that the restored data matches the 
expected row count for
-   * each table.
+   * generated in the backup directory. 2. Replaying the WAL files to restore 
data and check
+   * consistency by verifying that the restored data matches the expected row 
count for each table.
    */
-  private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles,
-    Map<TableName, Integer> tablesWithExpectedRows) throws IOException {
+  private void verifyBackup(String backupRootDir, Map<TableName, Integer> 
tablesWithExpectedRows)
+    throws IOException {
     verifyWALBackup(backupRootDir);
-    if (hasBulkLoadFiles) {
-      verifyBulkLoadBackup(backupRootDir);
-    }
 
     for (Map.Entry<TableName, Integer> entry : 
tablesWithExpectedRows.entrySet()) {
       TableName tableName = entry.getKey();
@@ -440,21 +387,6 @@ public class TestContinuousBackupReplicationEndpoint {
 
       replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
 
-      // replay Bulk loaded HFiles if Present
-      try {
-        Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
-        if (fs.exists(bulkloadDir)) {
-          FileStatus[] directories = fs.listStatus(bulkloadDir);
-          for (FileStatus dirStatus : directories) {
-            if (dirStatus.isDirectory()) {
-              replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), 
tableName);
-            }
-          }
-        }
-      } catch (Exception e) {
-        fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
-      }
-
       assertEquals(expectedRows, getRowCount(tableName));
     }
   }
@@ -480,15 +412,6 @@ public class TestContinuousBackupReplicationEndpoint {
     assertFalse("Expected some WAL files but found none!", walFiles.isEmpty());
   }
 
-  private void verifyBulkLoadBackup(String backupRootDir) throws IOException {
-    Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
-    assertTrue("BulkLoad Files directory does not exist!", 
fs.exists(bulkLoadFilesDir));
-
-    FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir);
-    assertNotNull("No Bulk load files found!", bulkLoadFiles);
-    assertTrue("Expected some Bulk load files but found none!", 
bulkLoadFiles.length > 0);
-  }
-
   private void replayWALs(String walDir, TableName tableName) {
     WALPlayer player = new WALPlayer();
     try {
@@ -499,28 +422,6 @@ public class TestContinuousBackupReplicationEndpoint {
     }
   }
 
-  private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName 
tableName) {
-    try {
-      Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName);
-      if (fs.exists(tableBulkLoadDir)) {
-        RemoteIterator<LocatedFileStatus> fileStatusIterator = 
fs.listFiles(tableBulkLoadDir, true);
-        List<Path> bulkLoadFiles = new ArrayList<>();
-
-        while (fileStatusIterator.hasNext()) {
-          LocatedFileStatus fileStatus = fileStatusIterator.next();
-          Path filePath = fileStatus.getPath();
-
-          if (!fileStatus.isDirectory()) {
-            bulkLoadFiles.add(filePath);
-          }
-        }
-        bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), 
bulkLoadFiles));
-      }
-    } catch (Exception e) {
-      fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
-    }
-  }
-
   private int getRowCount(TableName tableName) throws IOException {
     try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
       return HBaseTestingUtil.countRows(table);


Reply via email to