This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch HBASE-29447-branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 0f53948448c72f07b135ba2f326032f76cece0e8 Author: Hernan Romer <nanu...@gmail.com> AuthorDate: Wed Jul 16 16:10:25 2025 -0400 HBASE-29447 Fix WAL archives cause incremental backup failures (#7151) Co-authored-by: Hernan Gelaf-Romer <hgelafro...@hubspot.com> Signed-off-by: Ray Mattingly <rmattin...@apache.org> --- .../hadoop/hbase/mapreduce/WALInputFormat.java | 34 +++++++++++-- .../hadoop/hbase/mapreduce/TestWALInputFormat.java | 55 +++++++++++++++++++++- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 8d6e91633f7..badf581efef 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -318,7 +318,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { for (Path inputPath : inputPaths) { FileSystem fs = inputPath.getFileSystem(conf); try { - List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); + List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime, conf); allFiles.addAll(files); } catch (FileNotFoundException e) { if (ignoreMissing) { @@ -349,11 +349,11 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { * equal to this value else we will filter out the file. If name does not seem to * have a timestamp, we will just return it w/o filtering. */ - private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) - throws IOException { + private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime, + Configuration conf) throws IOException { List<FileStatus> result = new ArrayList<>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); - RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); + RemoteIterator<LocatedFileStatus> iter = listLocatedFileStatus(fs, dir, conf); if (!iter.hasNext()) { return Collections.emptyList(); } @@ -361,7 +361,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { LocatedFileStatus file = iter.next(); if (file.isDirectory()) { // Recurse into sub directories - result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); + result.addAll(getFiles(fs, file.getPath(), startTime, endTime, conf)); } else { addFile(result, file, startTime, endTime); } @@ -396,4 +396,28 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { TaskAttemptContext context) throws IOException, InterruptedException { return new WALKeyRecordReader(); } + + /** + * Attempts to return the {@link LocatedFileStatus} for the given directory. If the directory does + * not exist, it will check if the directory is an archived log file and try to find it + */ + private static RemoteIterator<LocatedFileStatus> listLocatedFileStatus(FileSystem fs, Path dir, + Configuration conf) throws IOException { + try { + return fs.listLocatedStatus(dir); + } catch (FileNotFoundException e) { + if (AbstractFSWALProvider.isArchivedLogFile(dir)) { + throw e; + } + + LOG.warn("Log file {} not found, trying to find it in archive directory.", dir); + Path archiveFile = AbstractFSWALProvider.findArchivedLog(dir, conf); + if (archiveFile == null) { + LOG.error("Did not find archive file for {}", dir); + throw e; + } + + return fs.listLocatedStatus(archiveFile); + } + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java index 70602a37166..930c8d11375 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java @@ -21,24 +21,43 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -@Category({ MapReduceTests.class, SmallTests.class }) +@Category({ MapReduceTests.class, MediumTests.class }) public class TestWALInputFormat { + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALInputFormat.class); + @BeforeClass + public static void setupClass() throws Exception { + TEST_UTIL.startMiniCluster(); + TEST_UTIL.createWALRootDir(); + } + /** * Test the primitive start/end time filtering. */ @@ -74,4 +93,36 @@ public class TestWALInputFormat { WALInputFormat.addFile(lfss, lfs, now, now); assertEquals(8, lfss.size()); } + + @Test + public void testHandlesArchivedWALFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + JobContext ctx = Mockito.mock(JobContext.class); + Mockito.when(ctx.getConfiguration()).thenReturn(conf); + Job job = Job.getInstance(conf); + TableMapReduceUtil.initCredentialsForCluster(job, conf); + Mockito.when(ctx.getCredentials()).thenReturn(job.getCredentials()); + + // Setup WAL file, then archive it + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0); + AbstractFSWAL wal = (AbstractFSWAL) rs.getWALs().get(0); + Path walPath = wal.getCurrentFileName(); + TEST_UTIL.getConfiguration().set(FileInputFormat.INPUT_DIR, walPath.toString()); + TEST_UTIL.getConfiguration().set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + + Path rootDir = CommonFSUtils.getWALRootDir(conf); + Path archiveWal = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + archiveWal = new Path(archiveWal, walPath.getName()); + TEST_UTIL.getTestFileSystem().delete(walPath, true); + TEST_UTIL.getTestFileSystem().mkdirs(archiveWal.getParent()); + TEST_UTIL.getTestFileSystem().create(archiveWal).close(); + + // Test for that we can read from the archived WAL file + WALInputFormat wif = new WALInputFormat(); + List<InputSplit> splits = wif.getSplits(ctx); + assertEquals(1, splits.size()); + WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); + assertEquals(archiveWal.toString(), split.getLogFileName()); + } + }