This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 006529af18aee5dfa179a19c00040df9a6c19096 Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Tue Feb 18 23:15:34 2025 +0530 HBASE-28996: Implement Custom ReplicationEndpoint to Enable WAL Backup to External Storage (#6633) * HBASE-28996: Implement Custom ReplicationEndpoint to Enable WAL Backup to External Storage * fix spotless error --- .../replication/BackupFileSystemManager.java | 71 +++ .../backup/replication/BulkLoadProcessor.java | 96 ++++ .../ContinuousBackupReplicationEndpoint.java | 440 ++++++++++++++++++ .../replication/ObjectStoreProtobufWalWriter.java | 73 +++ .../hadoop/hbase/backup/replication/Utils.java | 48 +- .../TestContinuousBackupReplicationEndpoint.java | 513 +++++++++++++++++++++ .../hbase/replication/ReplicationEndpoint.java | 18 +- .../hbase/replication/ReplicationResult.java} | 53 +-- .../VerifyWALEntriesReplicationEndpoint.java | 4 +- .../HBaseInterClusterReplicationEndpoint.java | 13 +- .../regionserver/ReplicationSource.java | 35 +- .../regionserver/ReplicationSourceInterface.java | 9 +- .../regionserver/ReplicationSourceShipper.java | 23 +- .../visibility/VisibilityReplicationEndpoint.java | 3 +- .../replication/DummyReplicationEndpoint.java | 4 +- .../replication/SerialReplicationTestBase.java | 4 +- .../replication/TestHBaseReplicationEndpoint.java | 8 +- .../TestNonHBaseReplicationEndpoint.java | 4 +- .../hbase/replication/TestReplicationBase.java | 2 +- .../hbase/replication/TestReplicationEndpoint.java | 22 +- .../TestVerifyCellsReplicationEndpoint.java | 2 +- ...InterClusterReplicationEndpointFilterEdits.java | 4 +- .../TestRaceWhenCreatingReplicationSource.java | 5 +- .../regionserver/TestReplicationSourceManager.java | 5 +- .../replication/regionserver/TestReplicator.java | 3 +- .../TestVisibilityLabelsReplication.java | 5 +- 26 files changed, 1320 insertions(+), 147 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java new file mode 100644 index 00000000000..225d3217276 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and + * bulk-loaded files within the specified backup root directory. + */ +@InterfaceAudience.Private +public class BackupFileSystemManager { + private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class); + + public static final String WALS_DIR = "WALs"; + public static final String BULKLOAD_FILES_DIR = "bulk-load-files"; + private final String peerId; + private final FileSystem backupFs; + private final Path backupRootDir; + private final Path walsDir; + private final Path bulkLoadFilesDir; + + public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr) + throws IOException { + this.peerId = peerId; + this.backupRootDir = new Path(backupRootDirStr); + this.backupFs = FileSystem.get(backupRootDir.toUri(), conf); + this.walsDir = createDirectory(WALS_DIR); + this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR); + } + + private Path createDirectory(String dirName) throws IOException { + Path dirPath = new Path(backupRootDir, dirName); + backupFs.mkdirs(dirPath); + LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath); + return dirPath; + } + + public Path getWalsDir() { + return walsDir; + } + + public Path getBulkLoadFilesDir() { + return bulkLoadFilesDir; + } + + public FileSystem getBackupFs() { + return backupFs; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java new file mode 100644 index 00000000000..6e1271313bc --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication. + * <p> + * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL + * entries. It processes bulk load descriptors and their associated store descriptors to generate + * the paths for each bulk-loaded file. + * <p> + * The class is designed for scenarios where replicable bulk load operations need to be parsed and + * their file paths need to be determined programmatically. + * </p> + */ +@InterfaceAudience.Private +public final class BulkLoadProcessor { + private BulkLoadProcessor() { + } + + public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) throws IOException { + List<Path> bulkLoadFilePaths = new ArrayList<>(); + + for (WAL.Entry entry : walEntries) { + WALEdit edit = entry.getEdit(); + for (Cell cell : edit.getCells()) { + if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + TableName tableName = entry.getKey().getTableName(); + String namespace = tableName.getNamespaceAsString(); + String table = tableName.getQualifierAsString(); + bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table)); + } + } + } + return bulkLoadFilePaths; + } + + private static List<Path> processBulkLoadDescriptor(Cell cell, String namespace, String table) + throws IOException { + List<Path> bulkLoadFilePaths = new ArrayList<>(); + WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + + if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) { + return bulkLoadFilePaths; // Skip if not replicable + } + + String regionName = bld.getEncodedRegionName().toStringUtf8(); + for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) { + bulkLoadFilePaths + .addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName)); + } + + return bulkLoadFilePaths; + } + + private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor, + String namespace, String table, String regionName) { + List<Path> paths = new ArrayList<>(); + String columnFamily = storeDescriptor.getFamilyName().toStringUtf8(); + + for (String storeFile : storeDescriptor.getStoreFileList()) { + paths.add(new Path(namespace, + new Path(table, new Path(regionName, new Path(columnFamily, storeFile))))); + } + + return paths; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java new file mode 100644 index 00000000000..c973af8102e --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationResult; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ContinuousBackupReplicationEndpoint is responsible for replicating WAL entries to a backup + * storage. It organizes WAL entries by day and periodically flushes the data, ensuring that WAL + * files do not exceed the configured size. The class includes mechanisms for handling the WAL + * files, performing bulk load backups, and ensuring that the replication process is safe. + */ +@InterfaceAudience.Private +public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint { + private static final Logger LOG = + LoggerFactory.getLogger(ContinuousBackupReplicationEndpoint.class); + public static final String CONF_PEER_UUID = "hbase.backup.wal.replication.peerUUID"; + public static final String CONF_BACKUP_ROOT_DIR = "hbase.backup.root.dir"; + public static final String CONF_BACKUP_MAX_WAL_SIZE = "hbase.backup.max.wal.size"; + public static final long DEFAULT_MAX_WAL_SIZE = 128 * 1024 * 1024; + + public static final String CONF_STAGED_WAL_FLUSH_INITIAL_DELAY = + "hbase.backup.staged.wal.flush.initial.delay.seconds"; + public static final int DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS = 5 * 60; // 5 minutes + public static final String CONF_STAGED_WAL_FLUSH_INTERVAL = + "hbase.backup.staged.wal.flush.interval.seconds"; + public static final int DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS = 5 * 60; // 5 minutes + public static final int EXECUTOR_TERMINATION_TIMEOUT_SECONDS = 60; // TODO: configurable?? + + private final Map<Long, FSHLogProvider.Writer> walWriters = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + + private ReplicationSourceInterface replicationSource; + private Configuration conf; + private BackupFileSystemManager backupFileSystemManager; + private UUID peerUUID; + private String peerId; + private ScheduledExecutorService flushExecutor; + + public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1); + public static final String WAL_FILE_PREFIX = "wal_file."; + public static final String DATE_FORMAT = "yyyy-MM-dd"; + + @Override + public void init(Context context) throws IOException { + super.init(context); + this.replicationSource = context.getReplicationSource(); + this.peerId = context.getPeerId(); + this.conf = HBaseConfiguration.create(context.getConfiguration()); + + initializePeerUUID(); + initializeBackupFileSystemManager(); + startWalFlushExecutor(); + LOG.info("{} Initialization complete", Utils.logPeerId(peerId)); + } + + private void initializePeerUUID() throws IOException { + String peerUUIDStr = conf.get(CONF_PEER_UUID); + if (peerUUIDStr == null || peerUUIDStr.isEmpty()) { + throw new IOException("Peer UUID is not specified. Please configure " + CONF_PEER_UUID); + } + try { + this.peerUUID = UUID.fromString(peerUUIDStr); + LOG.info("{} Peer UUID initialized to {}", Utils.logPeerId(peerId), peerUUID); + } catch (IllegalArgumentException e) { + throw new IOException("Invalid Peer UUID format: " + peerUUIDStr, e); + } + } + + private void initializeBackupFileSystemManager() throws IOException { + String backupRootDir = conf.get(CONF_BACKUP_ROOT_DIR); + if (backupRootDir == null || backupRootDir.isEmpty()) { + throw new IOException( + "Backup root directory is not specified. Configure " + CONF_BACKUP_ROOT_DIR); + } + + try { + this.backupFileSystemManager = new BackupFileSystemManager(peerId, conf, backupRootDir); + LOG.info("{} BackupFileSystemManager initialized successfully for {}", + Utils.logPeerId(peerId), backupRootDir); + } catch (IOException e) { + throw new IOException("Failed to initialize BackupFileSystemManager", e); + } + } + + private void startWalFlushExecutor() { + int initialDelay = conf.getInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, + DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS); + int flushInterval = + conf.getInt(CONF_STAGED_WAL_FLUSH_INTERVAL, DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS); + + flushExecutor = Executors.newSingleThreadScheduledExecutor(); + flushExecutor.scheduleAtFixedRate(this::flushAndBackupSafely, initialDelay, flushInterval, + TimeUnit.SECONDS); + LOG.info("{} Scheduled WAL flush executor started with initial delay {}s and interval {}s", + Utils.logPeerId(peerId), initialDelay, flushInterval); + } + + private void flushAndBackupSafely() { + lock.lock(); + try { + LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId)); + flushWriters(); + replicationSource.persistOffsets(); + LOG.info("{} Periodic WAL flush and offset persistence completed successfully", + Utils.logPeerId(peerId)); + } catch (IOException e) { + LOG.error("{} Error during WAL flush: {}", Utils.logPeerId(peerId), e.getMessage(), e); + } finally { + lock.unlock(); + } + } + + private void flushWriters() throws IOException { + LOG.info("{} Flushing {} WAL writers", Utils.logPeerId(peerId), walWriters.size()); + for (Map.Entry<Long, FSHLogProvider.Writer> entry : walWriters.entrySet()) { + FSHLogProvider.Writer writer = entry.getValue(); + if (writer != null) { + LOG.debug("{} Closing WAL writer for day: {}", Utils.logPeerId(peerId), entry.getKey()); + try { + writer.close(); + LOG.debug("{} Successfully closed WAL writer for day: {}", Utils.logPeerId(peerId), + entry.getKey()); + } catch (IOException e) { + LOG.error("{} Failed to close WAL writer for day: {}. Error: {}", Utils.logPeerId(peerId), + entry.getKey(), e.getMessage(), e); + throw e; + } + } + } + walWriters.clear(); + LOG.info("{} WAL writers flushed and cleared", Utils.logPeerId(peerId)); + } + + @Override + public UUID getPeerUUID() { + return peerUUID; + } + + @Override + public void start() { + LOG.info("{} Starting ContinuousBackupReplicationEndpoint", Utils.logPeerId(peerId)); + startAsync(); + } + + @Override + protected void doStart() { + LOG.info("{} ContinuousBackupReplicationEndpoint started successfully.", + Utils.logPeerId(peerId)); + notifyStarted(); + } + + @Override + public ReplicationResult replicate(ReplicateContext replicateContext) { + final List<WAL.Entry> entries = replicateContext.getEntries(); + if (entries.isEmpty()) { + LOG.debug("{} No WAL entries to replicate", Utils.logPeerId(peerId)); + return ReplicationResult.SUBMITTED; + } + + LOG.debug("{} Received {} WAL entries for replication", Utils.logPeerId(peerId), + entries.size()); + + Map<Long, List<WAL.Entry>> groupedEntries = groupEntriesByDay(entries); + LOG.debug("{} Grouped WAL entries by day: {}", Utils.logPeerId(peerId), + groupedEntries.keySet()); + + lock.lock(); + try { + for (Map.Entry<Long, List<WAL.Entry>> entry : groupedEntries.entrySet()) { + LOG.debug("{} Backing up {} WAL entries for day {}", Utils.logPeerId(peerId), + entry.getValue().size(), entry.getKey()); + backupWalEntries(entry.getKey(), entry.getValue()); + } + + if (isAnyWriterFull()) { + LOG.debug("{} Some WAL writers reached max size, triggering flush", + Utils.logPeerId(peerId)); + flushWriters(); + LOG.debug("{} Replication committed after WAL flush", Utils.logPeerId(peerId)); + return ReplicationResult.COMMITTED; + } + + LOG.debug("{} Replication submitted successfully", Utils.logPeerId(peerId)); + return ReplicationResult.SUBMITTED; + } catch (IOException e) { + LOG.error("{} Replication failed. Error details: {}", Utils.logPeerId(peerId), e.getMessage(), + e); + return ReplicationResult.FAILED; + } finally { + lock.unlock(); + } + } + + private Map<Long, List<WAL.Entry>> groupEntriesByDay(List<WAL.Entry> entries) { + return entries.stream().collect( + Collectors.groupingBy(entry -> (entry.getKey().getWriteTime() / ONE_DAY_IN_MILLISECONDS) + * ONE_DAY_IN_MILLISECONDS)); + } + + private boolean isAnyWriterFull() { + return walWriters.values().stream().anyMatch(this::isWriterFull); + } + + private boolean isWriterFull(FSHLogProvider.Writer writer) { + long maxWalSize = conf.getLong(CONF_BACKUP_MAX_WAL_SIZE, DEFAULT_MAX_WAL_SIZE); + return writer.getLength() >= maxWalSize; + } + + private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOException { + LOG.debug("{} Starting backup of {} WAL entries for day {}", Utils.logPeerId(peerId), + walEntries.size(), day); + + try { + FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, this::createWalWriter); + List<Path> bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId), + bulkLoadFiles.size()); + LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId), + bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); + } + + for (WAL.Entry entry : walEntries) { + walWriter.append(entry); + } + walWriter.sync(true); + uploadBulkLoadFiles(bulkLoadFiles); + } catch (UncheckedIOException e) { + String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day; + LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day, + e.getMessage(), e); + throw new IOException(errorMsg, e); + } + } + + private FSHLogProvider.Writer createWalWriter(long dayInMillis) { + // Convert dayInMillis to "yyyy-MM-dd" format + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + String dayDirectoryName = dateFormat.format(new Date(dayInMillis)); + + FileSystem fs = backupFileSystemManager.getBackupFs(); + Path walsDir = backupFileSystemManager.getWalsDir(); + + try { + // Create a directory for the day + Path dayDir = new Path(walsDir, dayDirectoryName); + fs.mkdirs(dayDir); + + // Generate a unique WAL file name + long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); + String walFileName = WAL_FILE_PREFIX + currentTime + "." + UUID.randomUUID(); + Path walFilePath = new Path(dayDir, walFileName); + + // Initialize the WAL writer + FSHLogProvider.Writer writer = + ObjectStoreProtobufWalWriter.class.getDeclaredConstructor().newInstance(); + writer.init(fs, walFilePath, conf, true, WALUtil.getWALBlockSize(conf, fs, walFilePath), + StreamSlowMonitor.create(conf, walFileName)); + + LOG.info("{} WAL writer created: {}", Utils.logPeerId(peerId), walFilePath); + return writer; + } catch (Exception e) { + throw new UncheckedIOException( + Utils.logPeerId(peerId) + " Failed to initialize WAL Writer for day: " + dayDirectoryName, + new IOException(e)); + } + } + + @Override + public void stop() { + LOG.info("{} Stopping ContinuousBackupReplicationEndpoint...", Utils.logPeerId(peerId)); + stopAsync(); + } + + @Override + protected void doStop() { + close(); + LOG.info("{} ContinuousBackupReplicationEndpoint stopped successfully.", + Utils.logPeerId(peerId)); + notifyStopped(); + } + + private void close() { + LOG.info("{} Closing WAL replication component...", Utils.logPeerId(peerId)); + shutdownFlushExecutor(); + lock.lock(); + try { + flushWriters(); + replicationSource.persistOffsets(); + } catch (IOException e) { + LOG.error("{} Failed to Flush Open Wal Writers: {}", Utils.logPeerId(peerId), e.getMessage(), + e); + } finally { + lock.unlock(); + LOG.info("{} WAL replication component closed.", Utils.logPeerId(peerId)); + } + } + + private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException { + LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId), + bulkLoadFiles.size()); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId), + bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); + } + for (Path file : bulkLoadFiles) { + Path sourcePath = getBulkLoadFileStagingPath(file); + Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file); + + try { + LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, + destPath); + + FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, + backupFileSystemManager.getBackupFs(), destPath, false, conf); + + LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file, + destPath); + } catch (IOException e) { + LOG.error("{} Failed to back up bulk load file {}: {}", Utils.logPeerId(peerId), file, + e.getMessage(), e); + throw e; + } + } + + LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId)); + } + + private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException { + FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); + Path rootDir = CommonFSUtils.getRootDir(conf); + Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); + Path baseNamespaceDir = new Path(rootDir, baseNSDir); + Path hFileArchiveDir = + new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); + + LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", Utils.logPeerId(peerId), + relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir); + + Path result = + findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace); + + if (result == null) { + LOG.error("{} No bulk loaded file found in relative path: {}", Utils.logPeerId(peerId), + relativePathFromNamespace); + throw new IOException( + "No Bulk loaded file found in relative path: " + relativePathFromNamespace); + } + + LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), result); + return result; + } + + private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir, + Path hFileArchiveDir, Path filePath) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Checking for bulk load file at: {} and {}", new Path(baseNamespaceDir, filePath), + new Path(hFileArchiveDir, filePath)); + } + + for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath), + new Path(hFileArchiveDir, filePath) }) { + if (rootFs.exists(candidate)) { + LOG.debug("Found bulk load file at: {}", candidate); + return candidate; + } + } + return null; + } + + private void shutdownFlushExecutor() { + if (flushExecutor != null) { + LOG.info("{} Initiating WAL flush executor shutdown.", Utils.logPeerId(peerId)); + + flushExecutor.shutdown(); + try { + if ( + !flushExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS) + ) { + LOG.warn("{} Flush executor did not terminate within timeout, forcing shutdown.", + Utils.logPeerId(peerId)); + flushExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + flushExecutor.shutdownNow(); + LOG.warn("{} Flush executor shutdown was interrupted.", Utils.logPeerId(peerId), e); + } + LOG.info("{} WAL flush thread stopped.", Utils.logPeerId(peerId)); + } + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java new file mode 100644 index 00000000000..27f4fbdc027 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; +import org.apache.hadoop.hbase.util.AtomicUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A custom implementation of {@link ProtobufLogWriter} that provides support for writing + * protobuf-based WAL (Write-Ahead Log) entries to object store-backed files. + * <p> + * This class overrides the {@link ProtobufLogWriter#sync(boolean)} and + * {@link ProtobufLogWriter#initOutput(FileSystem, Path, boolean, int, short, long, StreamSlowMonitor, boolean)} + * methods to ensure compatibility with object stores, while ignoring specific capability checks + * such as HFLUSH and HSYNC. These checks are often not supported by some object stores, and + * bypassing them ensures smooth operation in such environments. + * </p> + */ +@InterfaceAudience.Private +public class ObjectStoreProtobufWalWriter extends ProtobufLogWriter { + private final AtomicLong syncedLength = new AtomicLong(0); + + @Override + public void sync(boolean forceSync) throws IOException { + FSDataOutputStream fsDataOutputstream = this.output; + if (fsDataOutputstream == null) { + return; // Presume closed + } + // Special case for Hadoop S3: Unlike traditional file systems, where flush() ensures data is + // durably written, in Hadoop S3, flush() only writes data to the internal buffer and does not + // immediately persist it to S3. The actual upload to S3 happens asynchronously, typically when + // a block is full or when close() is called, which finalizes the upload process. + fsDataOutputstream.flush(); + AtomicUtils.updateMax(this.syncedLength, fsDataOutputstream.getPos()); + } + + @Override + protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) + throws IOException { + try { + super.initOutput(fs, path, overwritable, bufferSize, replication, blockSize, monitor, + noLocalWrite); + } catch (CommonFSUtils.StreamLacksCapabilityException e) { + // Ignore capability check for HFLUSH and HSYNC capabilities + // Some object stores may not support these capabilities, so we bypass the exception handling + // to ensure compatibility with such stores. + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java similarity index 53% copy from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java copy to hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java index e6a39e7fede..69365674acc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java @@ -15,54 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.replication; +package org.apache.hadoop.hbase.backup.replication; -import java.util.UUID; import org.apache.yetus.audience.InterfaceAudience; -/** - * A dummy replication endpoint that does nothing, for test use only. - */ @InterfaceAudience.Private -public class DummyReplicationEndpoint extends BaseReplicationEndpoint { - - @Override - public boolean canReplicateToSameCluster() { - return true; - } - - @Override - public UUID getPeerUUID() { - return ctx.getClusterId(); - } - - @Override - public WALEntryFilter getWALEntryfilter() { - return null; - } - - @Override - public boolean replicate(ReplicateContext replicateContext) { - return true; - } - - @Override - public void start() { - startAsync(); - } - - @Override - public void stop() { - stopAsync(); - } - - @Override - protected void doStart() { - notifyStarted(); +public final class Utils { + private Utils() { } - @Override - protected void doStop() { - notifyStopped(); + public static String logPeerId(String peerId) { + return "[Source for peer " + peerId + "]:"; } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java new file mode 100644 index 00000000000..cd1f758f760 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestContinuousBackupReplicationEndpoint { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestContinuousBackupReplicationEndpoint.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestContinuousBackupReplicationEndpoint.class); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final Configuration conf = TEST_UTIL.getConfiguration(); + private static Admin admin; + + private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); + private static final String CF_NAME = "cf"; + private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier"); + static FileSystem fs = null; + static Path root; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Set the configuration properties as required + conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf.set(REPLICATION_CLUSTER_ID, "clusterId1"); + + TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniCluster(3); + fs = FileSystem.get(conf); + root = TEST_UTIL.getDataTestDirOnTestFS(); + admin = TEST_UTIL.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (fs != null) { + fs.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testWALAndBulkLoadFileBackup() throws IOException { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName = TableName.valueOf("table_" + methodName); + String peerId = "peerId"; + + createTable(tableName); + + Path backupRootDir = new Path(root, methodName); + fs.mkdirs(backupRootDir); + + Map<TableName, List<String>> tableMap = new HashMap<>(); + tableMap.put(tableName, new ArrayList<>()); + + addReplicationPeer(peerId, backupRootDir, tableMap); + + loadRandomData(tableName, 100); + assertEquals(100, getRowCount(tableName)); + + Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily"); + generateHFiles(dir); + bulkLoadHFiles(tableName, dir); + assertEquals(1100, getRowCount(tableName)); + + waitForReplication(15000); + deleteReplicationPeer(peerId); + + verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100)); + + deleteTable(tableName); + } + + @Test + public void testMultiTableWALBackup() throws IOException { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName table1 = TableName.valueOf("table_" + methodName + "1"); + TableName table2 = TableName.valueOf("table_" + methodName + "2"); + TableName table3 = TableName.valueOf("table_" + methodName + "3"); + String peerId = "peerMulti"; + + for (TableName table : List.of(table1, table2, table3)) { + createTable(table); + } + + Path backupRootDir = new Path(root, methodName); + fs.mkdirs(backupRootDir); + + Map<TableName, List<String>> initialTableMap = new HashMap<>(); + initialTableMap.put(table1, new ArrayList<>()); + initialTableMap.put(table2, new ArrayList<>()); + + addReplicationPeer(peerId, backupRootDir, initialTableMap); + + for (TableName table : List.of(table1, table2, table3)) { + loadRandomData(table, 50); + assertEquals(50, getRowCount(table)); + } + + waitForReplication(15000); + + // Update the Replication Peer to Include table3 + admin.updateReplicationPeerConfig(peerId, + ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig(peerId)) + .setTableCFsMap( + Map.of(table1, new ArrayList<>(), table2, new ArrayList<>(), table3, new ArrayList<>())) + .build()); + + for (TableName table : List.of(table1, table2, table3)) { + loadRandomData(table, 50); + assertEquals(100, getRowCount(table)); + } + + waitForReplication(15000); + deleteReplicationPeer(peerId); + + verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 100, table3, 50)); + + for (TableName table : List.of(table1, table2, table3)) { + deleteTable(table); + } + } + + @Test + public void testWALBackupWithPeerRestart() throws IOException, InterruptedException { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName = TableName.valueOf("table_" + methodName); + String peerId = "peerId"; + + createTable(tableName); + + Path backupRootDir = new Path(root, methodName); + fs.mkdirs(backupRootDir); + + Map<TableName, List<String>> tableMap = new HashMap<>(); + tableMap.put(tableName, new ArrayList<>()); + + addReplicationPeer(peerId, backupRootDir, tableMap); + + AtomicBoolean stopLoading = new AtomicBoolean(false); + + // Start a separate thread to load data continuously + Thread dataLoaderThread = new Thread(() -> { + try { + while (!stopLoading.get()) { + loadRandomData(tableName, 10); + Thread.sleep(1000); // Simulate delay + } + } catch (Exception e) { + LOG.error("Data loading thread encountered an error", e); + } + }); + + dataLoaderThread.start(); + + // Main thread enables and disables replication peer + try { + for (int i = 0; i < 5; i++) { + LOG.info("Disabling replication peer..."); + admin.disableReplicationPeer(peerId); + Thread.sleep(2000); + + LOG.info("Enabling replication peer..."); + admin.enableReplicationPeer(peerId); + Thread.sleep(2000); + } + } finally { + stopLoading.set(true); // Stop the data loader thread + dataLoaderThread.join(); + } + + waitForReplication(20000); + deleteReplicationPeer(peerId); + + verifyBackup(backupRootDir.toString(), false, Map.of(tableName, getRowCount(tableName))); + + deleteTable(tableName); + } + + @Test + public void testDayWiseWALBackup() throws IOException { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName = TableName.valueOf("table_" + methodName); + String peerId = "peerId"; + + createTable(tableName); + + Path backupRootDir = new Path(root, methodName); + fs.mkdirs(backupRootDir); + + Map<TableName, List<String>> tableMap = new HashMap<>(); + tableMap.put(tableName, new ArrayList<>()); + + addReplicationPeer(peerId, backupRootDir, tableMap); + + // Mock system time using ManualEnvironmentEdge + ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManagerTestHelper.injectEdge(manualEdge); + + long currentTime = System.currentTimeMillis(); + long oneDayBackTime = currentTime - ONE_DAY_IN_MILLISECONDS; + + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + String expectedPrevDayDir = dateFormat.format(new Date(oneDayBackTime)); + String expectedCurrentDayDir = dateFormat.format(new Date(currentTime)); + + manualEdge.setValue(oneDayBackTime); + loadRandomData(tableName, 100); + assertEquals(100, getRowCount(tableName)); + + manualEdge.setValue(currentTime); + loadRandomData(tableName, 100); + assertEquals(200, getRowCount(tableName)); + + // Reset time mocking + EnvironmentEdgeManagerTestHelper.reset(); + + waitForReplication(15000); + deleteReplicationPeer(peerId); + + verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200)); + + // Verify that WALs are stored in two directories, one for each day + Path walDir = new Path(backupRootDir, WALS_DIR); + Set<String> walDirectories = new HashSet<>(); + + FileStatus[] fileStatuses = fs.listStatus(walDir); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + String dirName = fileStatus.getPath().getName(); + walDirectories.add(dirName); + } + } + + assertEquals("WALs should be stored in exactly two directories", 2, walDirectories.size()); + assertTrue("Expected previous day's WAL directory missing", + walDirectories.contains(expectedPrevDayDir)); + assertTrue("Expected current day's WAL directory missing", + walDirectories.contains(expectedCurrentDayDir)); + + deleteTable(tableName); + } + + private void createTable(TableName tableName) throws IOException { + ColumnFamilyDescriptor columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF_NAME)).setScope(1).build(); + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnFamilyDescriptor).build(); + + if (!admin.tableExists(tableName)) { + admin.createTable(tableDescriptor); + } + } + + private void deleteTable(TableName tableName) throws IOException { + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + private void addReplicationPeer(String peerId, Path backupRootDir, + Map<TableName, List<String>> tableMap) throws IOException { + Map<String, String> additionalArgs = new HashMap<>(); + additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); + additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); + additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240"); + additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); + additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false) + .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); + + admin.addReplicationPeer(peerId, peerConfig); + } + + private void deleteReplicationPeer(String peerId) throws IOException { + admin.disableReplicationPeer(peerId); + admin.removeReplicationPeer(peerId); + } + + private void loadRandomData(TableName tableName, int totalRows) throws IOException { + int rowSize = 32; + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + TEST_UTIL.loadRandomRows(table, Bytes.toBytes(CF_NAME), rowSize, totalRows); + } + } + + private void bulkLoadHFiles(TableName tableName, Path inputDir) throws IOException { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration()); + loader.bulkLoad(table.getName(), inputDir); + } finally { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); + } + } + + private void bulkLoadHFiles(TableName tableName, Map<byte[], List<Path>> family2Files) + throws IOException { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration()); + loader.bulkLoad(table.getName(), family2Files); + } finally { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); + } + } + + private void generateHFiles(Path outputDir) throws IOException { + String hFileName = "MyHFile"; + int numRows = 1000; + outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + byte[] from = Bytes.toBytes(CF_NAME + "begin"); + byte[] to = Bytes.toBytes(CF_NAME + "end"); + + Path familyDir = new Path(outputDir, CF_NAME); + HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new Path(familyDir, hFileName), + Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows); + } + + private void waitForReplication(int durationInMillis) { + LOG.info("Waiting for replication to complete for {} ms", durationInMillis); + try { + Thread.sleep(durationInMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted while waiting", e); + } + } + + /** + * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead Log) files were + * generated in the backup directory. 2. Checking whether any bulk-loaded files were generated in + * the backup directory. 3. Replaying the WAL and bulk-loaded files (if present) to restore data + * and check consistency by verifying that the restored data matches the expected row count for + * each table. + */ + private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles, + Map<TableName, Integer> tablesWithExpectedRows) throws IOException { + verifyWALBackup(backupRootDir); + if (hasBulkLoadFiles) { + verifyBulkLoadBackup(backupRootDir); + } + + for (Map.Entry<TableName, Integer> entry : tablesWithExpectedRows.entrySet()) { + TableName tableName = entry.getKey(); + int expectedRows = entry.getValue(); + + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + assertEquals(0, getRowCount(tableName)); + + replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName); + replayBulkLoadHFilesIfPresent(new Path(backupRootDir, BULKLOAD_FILES_DIR).toString(), + tableName); + assertEquals(expectedRows, getRowCount(tableName)); + } + } + + private void verifyWALBackup(String backupRootDir) throws IOException { + Path walDir = new Path(backupRootDir, WALS_DIR); + assertTrue("WAL directory does not exist!", fs.exists(walDir)); + + RemoteIterator<LocatedFileStatus> fileStatusIterator = fs.listFiles(walDir, true); + List<Path> walFiles = new ArrayList<>(); + + while (fileStatusIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusIterator.next(); + Path filePath = fileStatus.getPath(); + + // Check if the file starts with the expected WAL prefix + if (!fileStatus.isDirectory() && filePath.getName().startsWith(WAL_FILE_PREFIX)) { + walFiles.add(filePath); + } + } + + assertNotNull("No WAL files found!", walFiles); + assertFalse("Expected some WAL files but found none!", walFiles.isEmpty()); + } + + private void verifyBulkLoadBackup(String backupRootDir) throws IOException { + Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR); + assertTrue("BulkLoad Files directory does not exist!", fs.exists(bulkLoadFilesDir)); + + FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir); + assertNotNull("No Bulk load files found!", bulkLoadFiles); + assertTrue("Expected some Bulk load files but found none!", bulkLoadFiles.length > 0); + } + + private void replayWALs(String walDir, TableName tableName) { + WALPlayer player = new WALPlayer(); + try { + assertEquals(0, ToolRunner.run(TEST_UTIL.getConfiguration(), player, + new String[] { walDir, tableName.getQualifierAsString() })); + } catch (Exception e) { + fail("Failed to replay WALs properly: " + e.getMessage()); + } + } + + private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName tableName) { + try { + Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName); + if (fs.exists(tableBulkLoadDir)) { + RemoteIterator<LocatedFileStatus> fileStatusIterator = fs.listFiles(tableBulkLoadDir, true); + List<Path> bulkLoadFiles = new ArrayList<>(); + + while (fileStatusIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusIterator.next(); + Path filePath = fileStatus.getPath(); + + if (!fileStatus.isDirectory()) { + bulkLoadFiles.add(filePath); + } + } + bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), bulkLoadFiles)); + } + } catch (Exception e) { + fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage()); + } + } + + private int getRowCount(TableName tableName) throws IOException { + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + return HBaseTestingUtil.countRows(table); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c9..fc5c2bf6265 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -51,6 +52,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { + private final ReplicationSourceInterface replicationSource; private final Server server; private final Configuration localConf; private final Configuration conf; @@ -63,10 +65,12 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { private final Abortable abortable; @InterfaceAudience.Private - public Context(final Server server, final Configuration localConf, final Configuration conf, - final FileSystem fs, final String peerId, final UUID clusterId, - final ReplicationPeer replicationPeer, final MetricsSource metrics, - final TableDescriptors tableDescriptors, final Abortable abortable) { + public Context(final ReplicationSourceInterface replicationSource, final Server server, + final Configuration localConf, final Configuration conf, final FileSystem fs, + final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, + final MetricsSource metrics, final TableDescriptors tableDescriptors, + final Abortable abortable) { + this.replicationSource = replicationSource; this.server = server; this.localConf = localConf; this.conf = conf; @@ -79,6 +83,10 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { this.abortable = abortable; } + public ReplicationSourceInterface getReplicationSource() { + return replicationSource; + } + public Server getServer() { return server; } @@ -208,7 +216,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { * the context are assumed to be persisted in the target cluster. * @param replicateContext a context where WAL entries and other parameters can be obtained. */ - boolean replicate(ReplicateContext replicateContext); + ReplicationResult replicate(ReplicateContext replicateContext); // The below methods are inspired by Guava Service. See // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java similarity index 53% copy from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java copy to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java index e6a39e7fede..03ed0ce6799 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java @@ -17,52 +17,17 @@ */ package org.apache.hadoop.hbase.replication; -import java.util.UUID; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; -/** - * A dummy replication endpoint that does nothing, for test use only. - */ -@InterfaceAudience.Private -public class DummyReplicationEndpoint extends BaseReplicationEndpoint { - - @Override - public boolean canReplicateToSameCluster() { - return true; - } - - @Override - public UUID getPeerUUID() { - return ctx.getClusterId(); - } - - @Override - public WALEntryFilter getWALEntryfilter() { - return null; - } - - @Override - public boolean replicate(ReplicateContext replicateContext) { - return true; - } - - @Override - public void start() { - startAsync(); - } - - @Override - public void stop() { - stopAsync(); - } +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public enum ReplicationResult { + /* Batch has been replicated and persisted successfully. */ + COMMITTED, - @Override - protected void doStart() { - notifyStarted(); - } + /* Batch has been submitted for replication, but not persisted yet. */ + SUBMITTED, - @Override - protected void doStop() { - notifyStopped(); - } + /* Batch replicaton failed, should be re-tried */ + FAILED } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java index 229cec57e97..a9674407bd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java @@ -59,10 +59,10 @@ public class VerifyWALEntriesReplicationEndpoint extends BaseReplicationEndpoint } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream()) .forEach(this::checkCell); - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 6bdc9773264..4f9a4909d78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -424,7 +425,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi * Do the shipping logic */ @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { int sleepMultiplier = 1; int initialTimeout = replicateContext.getTimeout(); @@ -444,7 +445,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi lastSinkFetchTime = EnvironmentEdgeManager.currentTime(); } sleepForRetries("No sinks available at peer", sleepMultiplier); - return false; + return ReplicationResult.FAILED; } List<List<Entry>> batches = createBatches(replicateContext.getEntries()); @@ -458,7 +459,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi try { // replicate the batches to sink side. parallelReplicate(replicateContext, batches); - return true; + return ReplicationResult.COMMITTED; } catch (IOException ioe) { if (ioe instanceof RemoteException) { if (dropOnDeletedTables && isTableNotFoundException(ioe)) { @@ -467,14 +468,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi batches = filterNotExistTableEdits(batches); if (batches.isEmpty()) { LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); - return true; + return ReplicationResult.COMMITTED; } } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { batches = filterNotExistColumnFamilyEdits(batches); if (batches.isEmpty()) { LOG.warn("After filter not exist column family's edits, 0 edits to replicate, " + "just return"); - return true; + return ReplicationResult.COMMITTED; } } else { LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), @@ -506,7 +507,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } } } - return false; // in case we exited before replicating + return ReplicationResult.FAILED; // in case we exited before replicating } protected boolean isPeerEnabled() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 094fa4aaa78..d6d59a39b52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -166,6 +167,8 @@ public class ReplicationSource implements ReplicationSourceInterface { */ private final List<WALEntryFilter> baseFilterOutWALEntries; + private final Map<String, WALEntryBatch> lastEntryBatch = new ConcurrentHashMap<>(); + ReplicationSource() { // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. this(p -> !AbstractFSWALProvider.isMetaFile(p), @@ -318,8 +321,8 @@ public class ReplicationSource implements ReplicationSourceInterface { if (server instanceof HRegionServer) { tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } - replicationEndpoint - .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs, + replicationEndpoint.init( + new ReplicationEndpoint.Context(this, server, conf, replicationPeer.getConfiguration(), fs, replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); replicationEndpoint.start(); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); @@ -861,4 +864,32 @@ public class ReplicationSource implements ReplicationSourceInterface { public long getTotalReplicatedEdits() { return totalReplicatedEdits.get(); } + + @Override + public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) { + String walName = entryBatch.getLastWalPath().getName(); + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName); + + synchronized (lastEntryBatch) { // Synchronize addition and processing + lastEntryBatch.put(walPrefix, entryBatch); + + if (replicated == ReplicationResult.COMMITTED) { + processAndClearEntries(); + } + } + } + + public void persistOffsets() { + synchronized (lastEntryBatch) { + processAndClearEntries(); + } + } + + private void processAndClearEntries() { + // Process all entries + lastEntryBatch + .forEach((prefix, batch) -> getSourceManager().logPositionAndCleanOldLogs(this, batch)); + // Clear all processed entries + lastEntryBatch.clear(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 69ad2887064..f482cc73e71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -207,7 +208,11 @@ public interface ReplicationSourceInterface { * @param entryBatch the wal entry batch we just shipped * @return The instance of queueStorage used by this ReplicationSource. */ - default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { - getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); + default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) { + + } + + default public void persistOffsets() { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 6d0730d76b6..ee819faa77b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -155,7 +156,7 @@ public class ReplicationSourceShipper extends Thread { List<Entry> entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch); + updateLogPosition(entryBatch, ReplicationResult.COMMITTED); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -182,21 +183,23 @@ public class ReplicationSourceShipper extends Thread { long startTimeNs = System.nanoTime(); // send the edits to the endpoint. Will block until the edits are shipped and acknowledged - boolean replicated = source.getReplicationEndpoint().replicate(replicateContext); + ReplicationResult replicated = source.getReplicationEndpoint().replicate(replicateContext); long endTimeNs = System.nanoTime(); - if (!replicated) { + if (replicated == ReplicationResult.FAILED) { continue; } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + if (replicated == ReplicationResult.COMMITTED) { + // Clean up hfile references + for (Entry entry : entries) { + cleanUpHFileRefs(entry.getEdit()); + LOG.trace("shipped entry {}: ", entry); + } } // Log and clean up WAL logs - updateLogPosition(entryBatch); + updateLogPosition(entryBatch, replicated); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -253,7 +256,7 @@ public class ReplicationSourceShipper extends Thread { } } - private boolean updateLogPosition(WALEntryBatch batch) { + private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and @@ -263,7 +266,7 @@ public class ReplicationSourceShipper extends Thread { batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || batch.getLastWalPosition() != currentPosition ) { - source.logPositionAndCleanOldLogs(batch); + source.logPositionAndCleanOldLogs(batch, replicated); updated = true; } // if end of file is true, then we can just skip to the next file in queue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index b97a08c01c3..a32ce78b0c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -63,7 +64,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { if (!delegator.canReplicateToSameCluster()) { // Only when the replication is inter cluster replication we need to // convert the visibility tags to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java index e6a39e7fede..f0e627316cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java @@ -42,8 +42,8 @@ public class DummyReplicationEndpoint extends BaseReplicationEndpoint { } @Override - public boolean replicate(ReplicateContext replicateContext) { - return true; + public ReplicationResult replicate(ReplicateContext replicateContext) { + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index f54c3931699..a8c76033d02 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -81,7 +81,7 @@ public class SerialReplicationTestBase { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { synchronized (WRITER) { try { for (Entry entry : replicateContext.getEntries()) { @@ -92,7 +92,7 @@ public class SerialReplicationTestBase { throw new UncheckedIOException(e); } } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 058564dc0ec..d9a75b8ca8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -59,8 +59,8 @@ public class TestHBaseReplicationEndpoint { when(replicationPeer.getPeerConfig()).thenReturn(peerConfig); when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase"); ReplicationEndpoint.Context context = - new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null, - null, null, replicationPeer, null, null, null); + new ReplicationEndpoint.Context(null, null, UTIL.getConfiguration(), UTIL.getConfiguration(), + null, null, null, replicationPeer, null, null, null); endpoint = new DummyHBaseReplicationEndpoint(); endpoint.init(context); } @@ -199,8 +199,8 @@ public class TestHBaseReplicationEndpoint { } @Override - public boolean replicate(ReplicateContext replicateContext) { - return false; + public ReplicationResult replicate(ReplicateContext replicateContext) { + return ReplicationResult.FAILED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java index 70cae18b456..c98b46c8e4b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java @@ -127,9 +127,9 @@ public class TestNonHBaseReplicationEndpoint { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { REPLICATED.set(true); - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 70a6d73c620..f53d9acc24f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -415,7 +415,7 @@ public class TestReplicationBase { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); replicatedEntries.addAll(replicateContext.getEntries()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 057a9f3567f..77cd5da8de0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -463,10 +463,10 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); lastEntries = new ArrayList<>(replicateContext.entries); - return true; + return ReplicationResult.COMMITTED; } @Override @@ -526,12 +526,12 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override - public boolean replicate(ReplicateContext context) { + public ReplicationResult replicate(ReplicateContext context) { try { Thread.sleep(duration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return false; + return ReplicationResult.FAILED; } return super.replicate(context); } @@ -548,9 +548,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override - public boolean replicate(ReplicateContext replicateContext) { - boolean success = super.replicate(replicateContext); - if (success) { + public ReplicationResult replicate(ReplicateContext replicateContext) { + ReplicationResult success = super.replicate(replicateContext); + if (success == ReplicationResult.COMMITTED) { replicateCount.addAndGet(replicateContext.entries.size()); } return success; @@ -577,7 +577,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { static AtomicBoolean replicated = new AtomicBoolean(false); @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { try { // check row doAssert(row); @@ -589,7 +589,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false - return replicated.get(); + return replicated.get() ? ReplicationResult.COMMITTED : ReplicationResult.FAILED; } } @@ -598,14 +598,14 @@ public class TestReplicationEndpoint extends TestReplicationBase { static AtomicReference<Exception> ex = new AtomicReference<>(null); @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { try { super.replicate(replicateContext); doAssert(row); } catch (Exception e) { ex.set(e); } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java index b990916ae75..50b0911970a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java @@ -71,7 +71,7 @@ public class TestVerifyCellsReplicationEndpoint { public static final class EndpointForTest extends VerifyWALEntriesReplicationEndpoint { @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { LOG.info(replicateContext.getEntries().toString()); replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells) .forEachOrdered(CELLS::addAll); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java index 7b108f5ca14..92e7c8290f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java @@ -83,8 +83,8 @@ public class TestHBaseInterClusterReplicationEndpointFilterEdits { when(rpc.isSerial()).thenReturn(false); when(replicationPeer.getPeerConfig()).thenReturn(rpc); when(rpc.getClusterKey()).thenReturn("hbase+zk://localhost:2181"); - Context context = new Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null, - null, null, replicationPeer, null, null, null); + Context context = new Context(null, null, UTIL.getConfiguration(), UTIL.getConfiguration(), + null, null, null, replicationPeer, null, null, null); endpoint = new HBaseInterClusterReplicationEndpoint(); endpoint.init(context); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index 66f04dca36d..d7b5bcdcccb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -94,7 +95,7 @@ public class TestRaceWhenCreatingReplicationSource { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { synchronized (WRITER) { try { for (Entry entry : replicateContext.getEntries()) { @@ -105,7 +106,7 @@ public class TestRaceWhenCreatingReplicationSource { throw new UncheckedIOException(e); } } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 663b444dc4e..c99f25380de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; @@ -94,13 +95,13 @@ public class TestReplicationSourceManager { private String clusterKey; @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { // if you want to block the replication, for example, do not want the recovered source to be // removed if (clusterKey.endsWith("error")) { throw new RuntimeException("Inject error"); } - return true; + return ReplicationResult.COMMITTED; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index 979db712ef3..cdbd1c73a2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -218,7 +219,7 @@ public class TestReplicator extends TestReplicationBase { } @Override - public boolean replicate(ReplicateContext replicateContext) { + public ReplicationResult replicate(ReplicateContext replicateContext) { try { await(); } catch (InterruptedException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 7d5a5627d2c..ffbc0d2cee5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; @@ -473,8 +474,8 @@ public class TestVisibilityLabelsReplication { } @Override - public boolean replicate(ReplicateContext replicateContext) { - boolean ret = super.replicate(replicateContext); + public ReplicationResult replicate(ReplicateContext replicateContext) { + ReplicationResult ret = super.replicate(replicateContext); lastEntries = replicateContext.getEntries(); replicateCount.incrementAndGet(); return ret;