This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 132e12a6311b5d74c8b7c3e1e1753a866da59bd0 Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Fri Jun 27 23:53:52 2025 +0530 HBASE-29406: Skip Copying Bulkloaded Files to Backup Location in Continuous Backup (#7119) Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com> --- .../hadoop/hbase/backup/impl/BackupCommands.java | 14 +-- .../hbase/backup/impl/FullTableBackupClient.java | 8 -- .../replication/BackupFileSystemManager.java | 11 +- .../backup/replication/BulkLoadProcessor.java | 96 ----------------- .../ContinuousBackupReplicationEndpoint.java | 93 +--------------- .../hbase/backup/TestBackupDeleteWithCleanup.java | 19 +--- .../hbase/backup/impl/TestBackupCommands.java | 7 +- .../TestContinuousBackupReplicationEndpoint.java | 117 ++------------------- 8 files changed, 18 insertions(+), 347 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 2020b84bc1c..3ae97c487ef 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -1004,7 +1004,6 @@ public final class BackupCommands { new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); FileSystem fs = manager.getBackupFs(); Path walDir = manager.getWalsDir(); - Path bulkloadDir = manager.getBulkLoadFilesDir(); // Delete contents under WAL directory if (fs.exists(walDir)) { @@ -1015,15 +1014,6 @@ public final class BackupCommands { System.out.println("Deleted all contents under WAL directory: " + walDir); } - // Delete contents under bulk load directory - if (fs.exists(bulkloadDir)) { - FileStatus[] bulkContents = fs.listStatus(bulkloadDir); - for (FileStatus item : bulkContents) { - fs.delete(item.getPath(), true); // recursive delete of each child - } - System.out.println("Deleted all contents under Bulk Load directory: " + bulkloadDir); - } - } catch (IOException e) { System.out.println("WARNING: Failed to delete contents under backup directories: " + backupWalDir + ". Error: " + e.getMessage()); @@ -1032,7 +1022,7 @@ public final class BackupCommands { } /** - * Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp. + * Cleans up old WAL files based on the determined cutoff timestamp. */ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime) throws IOException { @@ -1043,7 +1033,6 @@ public final class BackupCommands { new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); FileSystem fs = manager.getBackupFs(); Path walDir = manager.getWalsDir(); - Path bulkloadDir = manager.getBulkLoadFilesDir(); SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -1069,7 +1058,6 @@ public final class BackupCommands { if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) { System.out.println("Deleting outdated WAL directory: " + dirPath); fs.delete(dirPath, true); - fs.delete(new Path(bulkloadDir, dirName), true); } } catch (ParseException e) { System.out.println("WARNING: Failed to parse directory name '" + dirName diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 3f6ae3deb63..5b49496d626 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -190,14 +190,6 @@ public class FullTableBackupClient extends TableBackupClient { // set overall backup status: complete. Here we make sure to complete the backup. // After this checkpoint, even if entering cancel process, will let the backup finished backupInfo.setState(BackupState.COMPLETE); - - if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) { - System.out.println("NOTE: Bulkload replication is not enabled. " - + "Bulk loaded files will not be backed up as part of continuous backup. " - + "To ensure bulk loaded files are included in the backup, please enable bulkload replication " - + "(hbase.replication.bulkload.enabled=true) and configure other necessary settings " - + "to properly enable bulkload replication."); - } } private void handleNonContinuousBackup(Admin admin) throws IOException { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java index 225d3217276..9d1d818c207 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java @@ -26,20 +26,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and - * bulk-loaded files within the specified backup root directory. + * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) files within + * the specified backup root directory. */ @InterfaceAudience.Private public class BackupFileSystemManager { private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class); public static final String WALS_DIR = "WALs"; - public static final String BULKLOAD_FILES_DIR = "bulk-load-files"; private final String peerId; private final FileSystem backupFs; private final Path backupRootDir; private final Path walsDir; - private final Path bulkLoadFilesDir; public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr) throws IOException { @@ -47,7 +45,6 @@ public class BackupFileSystemManager { this.backupRootDir = new Path(backupRootDirStr); this.backupFs = FileSystem.get(backupRootDir.toUri(), conf); this.walsDir = createDirectory(WALS_DIR); - this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR); } private Path createDirectory(String dirName) throws IOException { @@ -61,10 +58,6 @@ public class BackupFileSystemManager { return walsDir; } - public Path getBulkLoadFilesDir() { - return bulkLoadFilesDir; - } - public FileSystem getBackupFs() { return backupFs; } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java deleted file mode 100644 index 6e1271313bc..00000000000 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.replication; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.yetus.audience.InterfaceAudience; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - -/** - * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication. - * <p> - * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL - * entries. It processes bulk load descriptors and their associated store descriptors to generate - * the paths for each bulk-loaded file. - * <p> - * The class is designed for scenarios where replicable bulk load operations need to be parsed and - * their file paths need to be determined programmatically. - * </p> - */ -@InterfaceAudience.Private -public final class BulkLoadProcessor { - private BulkLoadProcessor() { - } - - public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) throws IOException { - List<Path> bulkLoadFilePaths = new ArrayList<>(); - - for (WAL.Entry entry : walEntries) { - WALEdit edit = entry.getEdit(); - for (Cell cell : edit.getCells()) { - if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { - TableName tableName = entry.getKey().getTableName(); - String namespace = tableName.getNamespaceAsString(); - String table = tableName.getQualifierAsString(); - bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table)); - } - } - } - return bulkLoadFilePaths; - } - - private static List<Path> processBulkLoadDescriptor(Cell cell, String namespace, String table) - throws IOException { - List<Path> bulkLoadFilePaths = new ArrayList<>(); - WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - - if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) { - return bulkLoadFilePaths; // Skip if not replicable - } - - String regionName = bld.getEncodedRegionName().toStringUtf8(); - for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) { - bulkLoadFilePaths - .addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName)); - } - - return bulkLoadFilePaths; - } - - private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor, - String namespace, String table, String regionName) { - List<Path> paths = new ArrayList<>(); - String columnFamily = storeDescriptor.getFamilyName().toStringUtf8(); - - for (String storeFile : storeDescriptor.getStoreFileList()) { - paths.add(new Path(namespace, - new Path(table, new Path(regionName, new Path(columnFamily, storeFile))))); - } - - return paths; - } -} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index eeacc8fbf34..bf3fbd531bf 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -33,10 +33,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; -import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -56,8 +53,8 @@ import org.slf4j.LoggerFactory; /** * ContinuousBackupReplicationEndpoint is responsible for replicating WAL entries to a backup * storage. It organizes WAL entries by day and periodically flushes the data, ensuring that WAL - * files do not exceed the configured size. The class includes mechanisms for handling the WAL - * files, performing bulk load backups, and ensuring that the replication process is safe. + * files do not exceed the configured size. The class includes mechanisms for handling the WAL files + * and ensuring that the replication process is safe. */ @InterfaceAudience.Private public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint { @@ -292,20 +289,11 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint try { FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, this::createWalWriter); - List<Path> bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries); - - if (LOG.isTraceEnabled()) { - LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId), - bulkLoadFiles.size()); - LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId), - bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); - } for (WAL.Entry entry : walEntries) { walWriter.append(entry); } walWriter.sync(true); - uploadBulkLoadFiles(day, bulkLoadFiles); } catch (UncheckedIOException e) { String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day; LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day, @@ -375,41 +363,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint } } - private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) throws IOException { - LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId), - bulkLoadFiles.size()); - - if (LOG.isTraceEnabled()) { - LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId), - bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); - } - String dayDirectoryName = formatToDateString(dayInMillis); - Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName); - backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir); - - for (Path file : bulkLoadFiles) { - Path sourcePath = getBulkLoadFileStagingPath(file); - Path destPath = new Path(bulkloadDir, file); - - try { - LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, - destPath); - - FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, - backupFileSystemManager.getBackupFs(), destPath, false, conf); - - LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file, - destPath); - } catch (IOException e) { - LOG.error("{} Failed to back up bulk load file {}: {}", Utils.logPeerId(peerId), file, - e.getMessage(), e); - throw e; - } - } - - LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId)); - } - /** * Convert dayInMillis to "yyyy-MM-dd" format */ @@ -419,48 +372,6 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint return dateFormat.format(new Date(dayInMillis)); } - private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException { - FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); - Path rootDir = CommonFSUtils.getRootDir(conf); - Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); - Path baseNamespaceDir = new Path(rootDir, baseNSDir); - Path hFileArchiveDir = - new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); - - LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", Utils.logPeerId(peerId), - relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir); - - Path result = - findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace); - - if (result == null) { - LOG.error("{} No bulk loaded file found in relative path: {}", Utils.logPeerId(peerId), - relativePathFromNamespace); - throw new IOException( - "No Bulk loaded file found in relative path: " + relativePathFromNamespace); - } - - LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), result); - return result; - } - - private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir, - Path hFileArchiveDir, Path filePath) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Checking for bulk load file at: {} and {}", new Path(baseNamespaceDir, filePath), - new Path(hFileArchiveDir, filePath)); - } - - for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath), - new Path(hFileArchiveDir, filePath) }) { - if (rootFs.exists(candidate)) { - LOG.debug("Found bulk load file at: {}", candidate); - return candidate; - } - } - return null; - } - private void shutdownFlushExecutor() { if (flushExecutor != null) { LOG.info("{} Initiating WAL flush executor shutdown.", Utils.logPeerId(peerId)); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java index 07c9110072b..d22f4c9cda9 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.backup; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; -import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; @@ -165,7 +164,7 @@ public class TestBackupDeleteWithCleanup extends TestBackupBase { // Step 6: Verify that the backup WAL directory is empty assertTrue("WAL backup directory should be empty after force delete", - areWalAndBulkloadDirsEmpty(conf1, backupWalDir.toString())); + isWalDirsEmpty(conf1, backupWalDir.toString())); // Step 7: Take new full backup with continuous backup enabled String backupIdContinuous = fullTableBackupWithContinuous(Lists.newArrayList(table1)); @@ -190,35 +189,28 @@ public class TestBackupDeleteWithCleanup extends TestBackupBase { public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime) throws IOException { Path walsDir = new Path(backupWalDir, WALS_DIR); - Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); fs.mkdirs(walsDir); - fs.mkdirs(bulkLoadDir); SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); for (int i = 0; i < 5; i++) { String dateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS))); fs.mkdirs(new Path(walsDir, dateStr)); - fs.mkdirs(new Path(bulkLoadDir, dateStr)); } } private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir, long currentTime) throws IOException { Path walsDir = new Path(backupWalDir, WALS_DIR); - Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); // Expect folders older than 3 days to be deleted for (int i = 3; i < 5; i++) { String oldDateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS))); Path walPath = new Path(walsDir, oldDateStr); - Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr); assertFalse("Old WAL directory (" + walPath + ") should be deleted, but it exists!", fs.exists(walPath)); - assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be deleted, but it exists!", - fs.exists(bulkLoadPath)); } // Expect folders within the last 3 days to exist @@ -226,13 +218,9 @@ public class TestBackupDeleteWithCleanup extends TestBackupBase { String recentDateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS))); Path walPath = new Path(walsDir, recentDateStr); - Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr); assertTrue("Recent WAL directory (" + walPath + ") should exist, but it is missing!", fs.exists(walPath)); - assertTrue( - "Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it is missing!", - fs.exists(bulkLoadPath)); } } @@ -276,16 +264,15 @@ public class TestBackupDeleteWithCleanup extends TestBackupBase { peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && peer.isEnabled()); } - private static boolean areWalAndBulkloadDirsEmpty(Configuration conf, String backupWalDir) + private static boolean isWalDirsEmpty(Configuration conf, String backupWalDir) throws IOException { BackupFileSystemManager manager = new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); FileSystem fs = manager.getBackupFs(); Path walDir = manager.getWalsDir(); - Path bulkloadDir = manager.getBulkLoadFilesDir(); - return isDirectoryEmpty(fs, walDir) && isDirectoryEmpty(fs, bulkloadDir); + return isDirectoryEmpty(fs, walDir); } private static boolean isDirectoryEmpty(FileSystem fs, Path dirPath) throws IOException { diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java index b2ebbd640bb..e00ebd6099f 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup.impl; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure; import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders; -import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; @@ -135,7 +134,7 @@ public class TestBackupCommands extends TestBackupBase { fs.mkdirs(backupWalDir); long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); - setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulk folders + setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WALs folders logDirectoryStructure(fs, backupWalDir, "Before cleanup:"); @@ -155,7 +154,6 @@ public class TestBackupCommands extends TestBackupBase { private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir, long currentTime, long cutoffTime) throws IOException { Path walsDir = new Path(backupWalDir, WALS_DIR); - Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR); SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -163,14 +161,11 @@ public class TestBackupCommands extends TestBackupBase { long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS); String dayDir = dateFormat.format(new Date(dayTime)); Path walPath = new Path(walsDir, dayDir); - Path bulkPath = new Path(bulkLoadDir, dayDir); if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) { assertFalse("Old WAL dir should be deleted: " + walPath, fs.exists(walPath)); - assertFalse("Old BulkLoad dir should be deleted: " + bulkPath, fs.exists(bulkPath)); } else { assertTrue("Recent WAL dir should exist: " + walPath, fs.exists(walPath)); - assertTrue("Recent BulkLoad dir should exist: " + bulkPath, fs.exists(bulkPath)); } } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java index 253675f85d9..3919746d3b7 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hbase.backup.replication; -import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; -import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; @@ -66,11 +64,8 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.tool.BulkLoadHFiles; -import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; @@ -96,14 +91,12 @@ public class TestContinuousBackupReplicationEndpoint { private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); private static final String CF_NAME = "cf"; - private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier"); static FileSystem fs = null; static Path root; @BeforeClass public static void setUpBeforeClass() throws Exception { // Set the configuration properties as required - conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); conf.set(REPLICATION_CLUSTER_ID, "clusterId1"); TEST_UTIL.startMiniZKCluster(); @@ -122,7 +115,7 @@ public class TestContinuousBackupReplicationEndpoint { } @Test - public void testWALAndBulkLoadFileBackup() throws IOException { + public void testWALBackup() throws IOException { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName = TableName.valueOf("table_" + methodName); String peerId = "peerId"; @@ -140,15 +133,10 @@ public class TestContinuousBackupReplicationEndpoint { loadRandomData(tableName, 100); assertEquals(100, getRowCount(tableName)); - Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily"); - generateHFiles(dir); - bulkLoadHFiles(tableName, dir); - assertEquals(1100, getRowCount(tableName)); - waitForReplication(15000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100)); + verifyBackup(backupRootDir.toString(), Map.of(tableName, 100)); deleteTable(tableName); } @@ -196,7 +184,7 @@ public class TestContinuousBackupReplicationEndpoint { waitForReplication(15000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 100, table3, 50)); + verifyBackup(backupRootDir.toString(), Map.of(table1, 100, table2, 100, table3, 50)); for (TableName table : List.of(table1, table2, table3)) { deleteTable(table); @@ -254,7 +242,7 @@ public class TestContinuousBackupReplicationEndpoint { waitForReplication(20000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), false, Map.of(tableName, getRowCount(tableName))); + verifyBackup(backupRootDir.toString(), Map.of(tableName, getRowCount(tableName))); deleteTable(tableName); } @@ -301,7 +289,7 @@ public class TestContinuousBackupReplicationEndpoint { waitForReplication(15000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200)); + verifyBackup(backupRootDir.toString(), Map.of(tableName, 200)); // Verify that WALs are stored in two directories, one for each day Path walDir = new Path(backupRootDir, WALS_DIR); @@ -370,42 +358,6 @@ public class TestContinuousBackupReplicationEndpoint { } } - private void bulkLoadHFiles(TableName tableName, Path inputDir) throws IOException { - TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); - - try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { - BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration()); - loader.bulkLoad(table.getName(), inputDir); - } finally { - TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); - } - } - - private void bulkLoadHFiles(TableName tableName, Map<byte[], List<Path>> family2Files) - throws IOException { - TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); - - try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { - BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration()); - loader.bulkLoad(table.getName(), family2Files); - } finally { - TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); - } - } - - private void generateHFiles(Path outputDir) throws IOException { - String hFileName = "MyHFile"; - int numRows = 1000; - outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - - byte[] from = Bytes.toBytes(CF_NAME + "begin"); - byte[] to = Bytes.toBytes(CF_NAME + "end"); - - Path familyDir = new Path(outputDir, CF_NAME); - HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new Path(familyDir, hFileName), - Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows); - } - private void waitForReplication(int durationInMillis) { LOG.info("Waiting for replication to complete for {} ms", durationInMillis); try { @@ -418,17 +370,12 @@ public class TestContinuousBackupReplicationEndpoint { /** * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead Log) files were - * generated in the backup directory. 2. Checking whether any bulk-loaded files were generated in - * the backup directory. 3. Replaying the WAL and bulk-loaded files (if present) to restore data - * and check consistency by verifying that the restored data matches the expected row count for - * each table. + * generated in the backup directory. 2. Replaying the WAL files to restore data and check + * consistency by verifying that the restored data matches the expected row count for each table. */ - private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles, - Map<TableName, Integer> tablesWithExpectedRows) throws IOException { + private void verifyBackup(String backupRootDir, Map<TableName, Integer> tablesWithExpectedRows) + throws IOException { verifyWALBackup(backupRootDir); - if (hasBulkLoadFiles) { - verifyBulkLoadBackup(backupRootDir); - } for (Map.Entry<TableName, Integer> entry : tablesWithExpectedRows.entrySet()) { TableName tableName = entry.getKey(); @@ -440,21 +387,6 @@ public class TestContinuousBackupReplicationEndpoint { replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName); - // replay Bulk loaded HFiles if Present - try { - Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR); - if (fs.exists(bulkloadDir)) { - FileStatus[] directories = fs.listStatus(bulkloadDir); - for (FileStatus dirStatus : directories) { - if (dirStatus.isDirectory()) { - replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), tableName); - } - } - } - } catch (Exception e) { - fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage()); - } - assertEquals(expectedRows, getRowCount(tableName)); } } @@ -480,15 +412,6 @@ public class TestContinuousBackupReplicationEndpoint { assertFalse("Expected some WAL files but found none!", walFiles.isEmpty()); } - private void verifyBulkLoadBackup(String backupRootDir) throws IOException { - Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR); - assertTrue("BulkLoad Files directory does not exist!", fs.exists(bulkLoadFilesDir)); - - FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir); - assertNotNull("No Bulk load files found!", bulkLoadFiles); - assertTrue("Expected some Bulk load files but found none!", bulkLoadFiles.length > 0); - } - private void replayWALs(String walDir, TableName tableName) { WALPlayer player = new WALPlayer(); try { @@ -499,28 +422,6 @@ public class TestContinuousBackupReplicationEndpoint { } } - private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName tableName) { - try { - Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName); - if (fs.exists(tableBulkLoadDir)) { - RemoteIterator<LocatedFileStatus> fileStatusIterator = fs.listFiles(tableBulkLoadDir, true); - List<Path> bulkLoadFiles = new ArrayList<>(); - - while (fileStatusIterator.hasNext()) { - LocatedFileStatus fileStatus = fileStatusIterator.next(); - Path filePath = fileStatus.getPath(); - - if (!fileStatus.isDirectory()) { - bulkLoadFiles.add(filePath); - } - } - bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), bulkLoadFiles)); - } - } catch (Exception e) { - fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage()); - } - } - private int getRowCount(TableName tableName) throws IOException { try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { return HBaseTestingUtil.countRows(table);