This is an automated email from the ASF dual-hosted git repository. andor pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit dc13ab5a9ddb8539ee19bbcc26975070075ef80d Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Wed Jun 25 01:28:51 2025 +0530 HBASE-29219 Ignore Empty WAL Files While Consuming Backed-Up WAL Files (#7106) Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> Reviewed by: Kota-SH <shanmukhaharipr...@gmail.com> Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com> --- .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 2 + .../hadoop/hbase/mapreduce/WALInputFormat.java | 11 +++++- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 18 +++++++++ .../hadoop/hbase/mapreduce/TestWALInputFormat.java | 43 ++++++++++++++++++++ .../hadoop/hbase/mapreduce/TestWALPlayer.java | 46 ++++++++++++++++++++++ 5 files changed, 118 insertions(+), 2 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 1e91258ba6c..e82d9804f9d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONT import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; import java.text.ParseException; @@ -846,6 +847,7 @@ public class BackupAdminImpl implements BackupAdmin { Configuration conf = HBaseConfiguration.create(conn.getConfiguration()); conf.setLong(WALInputFormat.START_TIME_KEY, startTime); conf.setLong(WALInputFormat.END_TIME_KEY, endTime); + conf.setBoolean(IGNORE_EMPTY_FILES, true); Tool walPlayer = new WALPlayer(); walPlayer.setConf(conf); return walPlayer; diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index badf581efef..b5c1d39a550 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -328,14 +328,21 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { throw e; } } + + boolean ignoreEmptyFiles = + conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES, WALPlayer.DEFAULT_IGNORE_EMPTY_FILES); List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); for (FileStatus file : allFiles) { + if (ignoreEmptyFiles && file.getLen() == 0) { + LOG.warn("Ignoring empty file: " + file.getPath()); + continue; + } splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); } return splits; } - private Path[] getInputPaths(Configuration conf) { + Path[] getInputPaths(Configuration conf) { String inpDirs = conf.get(FileInputFormat.INPUT_DIR); return StringUtils .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); @@ -349,7 +356,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { * equal to this value else we will filter out the file. If name does not seem to * have a timestamp, we will just return it w/o filtering. */ - private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime, + List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime, Configuration conf) throws IOException { List<FileStatus> result = new ArrayList<>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 5e2dc0902e0..cea6da97649 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -81,6 +81,24 @@ public class WALPlayer extends Configured implements Tool { public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support"; + /** + * Configuration flag that controls how the WALPlayer handles empty input WAL files. + * <p> + * If set to {@code true}, the WALPlayer will silently ignore empty files that cannot be parsed as + * valid WAL files. This is useful in scenarios where such files are expected (e.g., due to + * partial writes or cleanup operations). + * </p> + * <p> + * If set to {@code false} (default), the WALPlayer will throw an exception when it encounters an + * empty or un-parsable WAL file. This is useful for catching unexpected data issues early. + * </p> + * <p> + * Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false}) + * </p> + */ + public final static String IGNORE_EMPTY_FILES = "wal.input.ignore.empty.files"; + public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false; + protected static final String tableSeparator = ";"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java index 930c8d11375..0b2d66bd0b8 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -125,4 +129,43 @@ public class TestWALInputFormat { assertEquals(archiveWal.toString(), split.getLogFileName()); } + @Test + public void testEmptyFileIsIgnoredWhenConfigured() throws IOException, InterruptedException { + List<InputSplit> splits = getSplitsForEmptyFile(true); + assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true", splits.isEmpty()); + } + + @Test + public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException, InterruptedException { + List<InputSplit> splits = getSplitsForEmptyFile(false); + assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is false", 1, + splits.size()); + } + + private List<InputSplit> getSplitsForEmptyFile(boolean ignoreEmptyFiles) + throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles); + + JobContext jobContext = Mockito.mock(JobContext.class); + Mockito.when(jobContext.getConfiguration()).thenReturn(conf); + + LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class); + Mockito.when(emptyFile.getLen()).thenReturn(0L); + Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal")); + + WALInputFormat inputFormat = new WALInputFormat() { + @Override + Path[] getInputPaths(Configuration conf) { + return new Path[] { new Path("/input") }; + } + + @Override + List<FileStatus> getFiles(FileSystem fs, Path inputPath, long startTime, long endTime) { + return Collections.singletonList(emptyFile); + } + }; + + return inputFormat.getSplits(jobContext, "", ""); + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index b39d04802c9..7818f8d2f73 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -31,10 +32,12 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -338,4 +341,47 @@ public class TestWALPlayer { } + @Test + public void testIgnoreEmptyWALFiles() throws Exception { + Path inputDir = createEmptyWALFile("empty-wal-dir"); + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path emptyWAL = new Path(inputDir, "empty.wal"); + + assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); + assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true); + + int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); + assertEquals("WALPlayer should exit cleanly even with empty files", 0, exitCode); + } + + @Test + public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception { + Path inputDir = createEmptyWALFile("fail-empty-wal-dir"); + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path emptyWAL = new Path(inputDir, "empty.wal"); + + assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); + assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false); + + int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); + assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode); + } + + private Path createEmptyWALFile(String walDir) throws IOException { + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path inputDir = new Path("/" + walDir); + dfs.mkdirs(inputDir); + + Path emptyWAL = new Path(inputDir, "empty.wal"); + FSDataOutputStream out = dfs.create(emptyWAL); + out.close(); // Explicitly closing the stream + + return inputDir; + } }