This is an automated email from the ASF dual-hosted git repository. taklwu pushed a commit to branch HBASE-28957 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push: new a7a6d3c2726 HBASE-29219 Ignore Empty WAL Files While Consuming Backed-Up WAL Files (#7106) a7a6d3c2726 is described below commit a7a6d3c2726c15fb9324b75471a9e166c0ec5848 Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Wed Jun 25 01:28:51 2025 +0530 HBASE-29219 Ignore Empty WAL Files While Consuming Backed-Up WAL Files (#7106) Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> Reviewed by: Kota-SH <shanmukhaharipr...@gmail.com> Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com> --- .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 2 + .../hadoop/hbase/mapreduce/WALInputFormat.java | 11 ++++- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 18 +++++++++ .../hadoop/hbase/mapreduce/TestWALInputFormat.java | 47 ++++++++++++++++++++++ .../hadoop/hbase/mapreduce/TestWALPlayer.java | 46 +++++++++++++++++++++ 5 files changed, 122 insertions(+), 2 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 1e91258ba6c..e82d9804f9d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONT import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; import java.text.ParseException; @@ -846,6 +847,7 @@ public class BackupAdminImpl implements BackupAdmin { Configuration conf = HBaseConfiguration.create(conn.getConfiguration()); conf.setLong(WALInputFormat.START_TIME_KEY, startTime); conf.setLong(WALInputFormat.END_TIME_KEY, endTime); + conf.setBoolean(IGNORE_EMPTY_FILES, true); Tool walPlayer = new WALPlayer(); walPlayer.setConf(conf); return walPlayer; diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 8d6e91633f7..a49cdca5a84 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -328,14 +328,21 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { throw e; } } + + boolean ignoreEmptyFiles = + conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES, WALPlayer.DEFAULT_IGNORE_EMPTY_FILES); List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); for (FileStatus file : allFiles) { + if (ignoreEmptyFiles && file.getLen() == 0) { + LOG.warn("Ignoring empty file: " + file.getPath()); + continue; + } splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); } return splits; } - private Path[] getInputPaths(Configuration conf) { + Path[] getInputPaths(Configuration conf) { String inpDirs = conf.get(FileInputFormat.INPUT_DIR); return StringUtils .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); @@ -349,7 +356,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { * equal to this value else we will filter out the file. If name does not seem to * have a timestamp, we will just return it w/o filtering. */ - private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) + List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) throws IOException { List<FileStatus> result = new ArrayList<>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 5e2dc0902e0..cea6da97649 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -81,6 +81,24 @@ public class WALPlayer extends Configured implements Tool { public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support"; + /** + * Configuration flag that controls how the WALPlayer handles empty input WAL files. + * <p> + * If set to {@code true}, the WALPlayer will silently ignore empty files that cannot be parsed as + * valid WAL files. This is useful in scenarios where such files are expected (e.g., due to + * partial writes or cleanup operations). + * </p> + * <p> + * If set to {@code false} (default), the WALPlayer will throw an exception when it encounters an + * empty or un-parsable WAL file. This is useful for catching unexpected data issues early. + * </p> + * <p> + * Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false}) + * </p> + */ + public final static String IGNORE_EMPTY_FILES = "wal.input.ignore.empty.files"; + public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false; + protected static final String tableSeparator = ";"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java index 70602a37166..b31ea014977 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java @@ -18,16 +18,23 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -74,4 +81,44 @@ public class TestWALInputFormat { WALInputFormat.addFile(lfss, lfs, now, now); assertEquals(8, lfss.size()); } + + @Test + public void testEmptyFileIsIgnoredWhenConfigured() throws IOException, InterruptedException { + List<InputSplit> splits = getSplitsForEmptyFile(true); + assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true", splits.isEmpty()); + } + + @Test + public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException, InterruptedException { + List<InputSplit> splits = getSplitsForEmptyFile(false); + assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is false", 1, + splits.size()); + } + + private List<InputSplit> getSplitsForEmptyFile(boolean ignoreEmptyFiles) + throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles); + + JobContext jobContext = Mockito.mock(JobContext.class); + Mockito.when(jobContext.getConfiguration()).thenReturn(conf); + + LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class); + Mockito.when(emptyFile.getLen()).thenReturn(0L); + Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal")); + + WALInputFormat inputFormat = new WALInputFormat() { + @Override + Path[] getInputPaths(Configuration conf) { + return new Path[] { new Path("/input") }; + } + + @Override + List<FileStatus> getFiles(FileSystem fs, Path inputPath, long startTime, long endTime) { + return Collections.singletonList(emptyFile); + } + }; + + return inputFormat.getSplits(jobContext, "", ""); + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index b39d04802c9..7818f8d2f73 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -31,10 +32,12 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -338,4 +341,47 @@ public class TestWALPlayer { } + @Test + public void testIgnoreEmptyWALFiles() throws Exception { + Path inputDir = createEmptyWALFile("empty-wal-dir"); + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path emptyWAL = new Path(inputDir, "empty.wal"); + + assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); + assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true); + + int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); + assertEquals("WALPlayer should exit cleanly even with empty files", 0, exitCode); + } + + @Test + public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception { + Path inputDir = createEmptyWALFile("fail-empty-wal-dir"); + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path emptyWAL = new Path(inputDir, "empty.wal"); + + assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL)); + assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen()); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false); + + int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() }); + assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode); + } + + private Path createEmptyWALFile(String walDir) throws IOException { + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path inputDir = new Path("/" + walDir); + dfs.mkdirs(inputDir); + + Path emptyWAL = new Path(inputDir, "empty.wal"); + FSDataOutputStream out = dfs.create(emptyWAL); + out.close(); // Explicitly closing the stream + + return inputDir; + } }