This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit dc13ab5a9ddb8539ee19bbcc26975070075ef80d
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Wed Jun 25 01:28:51 2025 +0530

    HBASE-29219 Ignore Empty WAL Files While Consuming Backed-Up WAL Files 
(#7106)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Reviewed by: Kota-SH <shanmukhaharipr...@gmail.com>
    Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com>
---
 .../hadoop/hbase/backup/impl/BackupAdminImpl.java  |  2 +
 .../hadoop/hbase/mapreduce/WALInputFormat.java     | 11 +++++-
 .../apache/hadoop/hbase/mapreduce/WALPlayer.java   | 18 +++++++++
 .../hadoop/hbase/mapreduce/TestWALInputFormat.java | 43 ++++++++++++++++++++
 .../hadoop/hbase/mapreduce/TestWALPlayer.java      | 46 ++++++++++++++++++++++
 5 files changed, 118 insertions(+), 2 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 1e91258ba6c..e82d9804f9d 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -23,6 +23,7 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONT
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -846,6 +847,7 @@ public class BackupAdminImpl implements BackupAdmin {
     Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
     conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
     conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+    conf.setBoolean(IGNORE_EMPTY_FILES, true);
     Tool walPlayer = new WALPlayer();
     walPlayer.setConf(conf);
     return walPlayer;
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index badf581efef..b5c1d39a550 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -328,14 +328,21 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
         throw e;
       }
     }
+
+    boolean ignoreEmptyFiles =
+      conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES, 
WALPlayer.DEFAULT_IGNORE_EMPTY_FILES);
     List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
     for (FileStatus file : allFiles) {
+      if (ignoreEmptyFiles && file.getLen() == 0) {
+        LOG.warn("Ignoring empty file: " + file.getPath());
+        continue;
+      }
       splits.add(new WALSplit(file.getPath().toString(), file.getLen(), 
startTime, endTime));
     }
     return splits;
   }
 
-  private Path[] getInputPaths(Configuration conf) {
+  Path[] getInputPaths(Configuration conf) {
     String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
     return StringUtils
       
.stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, 
",")));
@@ -349,7 +356,7 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
    *                  equal to this value else we will filter out the file. If 
name does not seem to
    *                  have a timestamp, we will just return it w/o filtering.
    */
-  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, 
long endTime,
+  List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long 
endTime,
     Configuration conf) throws IOException {
     List<FileStatus> result = new ArrayList<>();
     LOG.debug("Scanning " + dir.toString() + " for WAL files");
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 5e2dc0902e0..cea6da97649 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -81,6 +81,24 @@ public class WALPlayer extends Configured implements Tool {
   public final static String IGNORE_MISSING_FILES = 
"wal.input.ignore.missing.files";
   public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";
 
+  /**
+   * Configuration flag that controls how the WALPlayer handles empty input 
WAL files.
+   * <p>
+   * If set to {@code true}, the WALPlayer will silently ignore empty files 
that cannot be parsed as
+   * valid WAL files. This is useful in scenarios where such files are 
expected (e.g., due to
+   * partial writes or cleanup operations).
+   * </p>
+   * <p>
+   * If set to {@code false} (default), the WALPlayer will throw an exception 
when it encounters an
+   * empty or un-parsable WAL file. This is useful for catching unexpected 
data issues early.
+   * </p>
+   * <p>
+   * Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false})
+   * </p>
+   */
+  public final static String IGNORE_EMPTY_FILES = 
"wal.input.ignore.empty.files";
+  public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false;
+
   protected static final String tableSeparator = ";";
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
index 930c8d11375..0b2d66bd0b8 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
@@ -18,11 +18,15 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -125,4 +129,43 @@ public class TestWALInputFormat {
     assertEquals(archiveWal.toString(), split.getLogFileName());
   }
 
+  @Test
+  public void testEmptyFileIsIgnoredWhenConfigured() throws IOException, 
InterruptedException {
+    List<InputSplit> splits = getSplitsForEmptyFile(true);
+    assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true", 
splits.isEmpty());
+  }
+
+  @Test
+  public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException, 
InterruptedException {
+    List<InputSplit> splits = getSplitsForEmptyFile(false);
+    assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is 
false", 1,
+      splits.size());
+  }
+
+  private List<InputSplit> getSplitsForEmptyFile(boolean ignoreEmptyFiles)
+    throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles);
+
+    JobContext jobContext = Mockito.mock(JobContext.class);
+    Mockito.when(jobContext.getConfiguration()).thenReturn(conf);
+
+    LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class);
+    Mockito.when(emptyFile.getLen()).thenReturn(0L);
+    Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal"));
+
+    WALInputFormat inputFormat = new WALInputFormat() {
+      @Override
+      Path[] getInputPaths(Configuration conf) {
+        return new Path[] { new Path("/input") };
+      }
+
+      @Override
+      List<FileStatus> getFiles(FileSystem fs, Path inputPath, long startTime, 
long endTime) {
+        return Collections.singletonList(emptyFile);
+      }
+    };
+
+    return inputFormat.getSplits(jobContext, "", "");
+  }
 }
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index b39d04802c9..7818f8d2f73 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -31,10 +32,12 @@ import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -338,4 +341,47 @@ public class TestWALPlayer {
 
   }
 
+  @Test
+  public void testIgnoreEmptyWALFiles() throws Exception {
+    Path inputDir = createEmptyWALFile("empty-wal-dir");
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path emptyWAL = new Path(inputDir, "empty.wal");
+
+    assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
+    assertEquals("WAL file should be 0 bytes", 0, 
dfs.getFileStatus(emptyWAL).getLen());
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true);
+
+    int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { 
inputDir.toString() });
+    assertEquals("WALPlayer should exit cleanly even with empty files", 0, 
exitCode);
+  }
+
+  @Test
+  public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception {
+    Path inputDir = createEmptyWALFile("fail-empty-wal-dir");
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path emptyWAL = new Path(inputDir, "empty.wal");
+
+    assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
+    assertEquals("WAL file should be 0 bytes", 0, 
dfs.getFileStatus(emptyWAL).getLen());
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false);
+
+    int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { 
inputDir.toString() });
+    assertNotEquals("WALPlayer should fail on empty files when not ignored", 
0, exitCode);
+  }
+
+  private Path createEmptyWALFile(String walDir) throws IOException {
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path inputDir = new Path("/" + walDir);
+    dfs.mkdirs(inputDir);
+
+    Path emptyWAL = new Path(inputDir, "empty.wal");
+    FSDataOutputStream out = dfs.create(emptyWAL);
+    out.close(); // Explicitly closing the stream
+
+    return inputDir;
+  }
 }

Reply via email to