This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-28957 by this push:
     new a7a6d3c2726 HBASE-29219 Ignore Empty WAL Files While Consuming 
Backed-Up WAL Files (#7106)
a7a6d3c2726 is described below

commit a7a6d3c2726c15fb9324b75471a9e166c0ec5848
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Wed Jun 25 01:28:51 2025 +0530

    HBASE-29219 Ignore Empty WAL Files While Consuming Backed-Up WAL Files 
(#7106)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Reviewed by: Kota-SH <shanmukhaharipr...@gmail.com>
    Reviewed by: Kevin Geiszler <kevin.j.geisz...@gmail.com>
---
 .../hadoop/hbase/backup/impl/BackupAdminImpl.java  |  2 +
 .../hadoop/hbase/mapreduce/WALInputFormat.java     | 11 ++++-
 .../apache/hadoop/hbase/mapreduce/WALPlayer.java   | 18 +++++++++
 .../hadoop/hbase/mapreduce/TestWALInputFormat.java | 47 ++++++++++++++++++++++
 .../hadoop/hbase/mapreduce/TestWALPlayer.java      | 46 +++++++++++++++++++++
 5 files changed, 122 insertions(+), 2 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 1e91258ba6c..e82d9804f9d 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -23,6 +23,7 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONT
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -846,6 +847,7 @@ public class BackupAdminImpl implements BackupAdmin {
     Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
     conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
     conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+    conf.setBoolean(IGNORE_EMPTY_FILES, true);
     Tool walPlayer = new WALPlayer();
     walPlayer.setConf(conf);
     return walPlayer;
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 8d6e91633f7..a49cdca5a84 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -328,14 +328,21 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
         throw e;
       }
     }
+
+    boolean ignoreEmptyFiles =
+      conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES, 
WALPlayer.DEFAULT_IGNORE_EMPTY_FILES);
     List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
     for (FileStatus file : allFiles) {
+      if (ignoreEmptyFiles && file.getLen() == 0) {
+        LOG.warn("Ignoring empty file: " + file.getPath());
+        continue;
+      }
       splits.add(new WALSplit(file.getPath().toString(), file.getLen(), 
startTime, endTime));
     }
     return splits;
   }
 
-  private Path[] getInputPaths(Configuration conf) {
+  Path[] getInputPaths(Configuration conf) {
     String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
     return StringUtils
       
.stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, 
",")));
@@ -349,7 +356,7 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
    *                  equal to this value else we will filter out the file. If 
name does not seem to
    *                  have a timestamp, we will just return it w/o filtering.
    */
-  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, 
long endTime)
+  List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long 
endTime)
     throws IOException {
     List<FileStatus> result = new ArrayList<>();
     LOG.debug("Scanning " + dir.toString() + " for WAL files");
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 5e2dc0902e0..cea6da97649 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -81,6 +81,24 @@ public class WALPlayer extends Configured implements Tool {
   public final static String IGNORE_MISSING_FILES = 
"wal.input.ignore.missing.files";
   public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";
 
+  /**
+   * Configuration flag that controls how the WALPlayer handles empty input 
WAL files.
+   * <p>
+   * If set to {@code true}, the WALPlayer will silently ignore empty files 
that cannot be parsed as
+   * valid WAL files. This is useful in scenarios where such files are 
expected (e.g., due to
+   * partial writes or cleanup operations).
+   * </p>
+   * <p>
+   * If set to {@code false} (default), the WALPlayer will throw an exception 
when it encounters an
+   * empty or un-parsable WAL file. This is useful for catching unexpected 
data issues early.
+   * </p>
+   * <p>
+   * Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false})
+   * </p>
+   */
+  public final static String IGNORE_EMPTY_FILES = 
"wal.input.ignore.empty.files";
+  public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false;
+
   protected static final String tableSeparator = ";";
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
index 70602a37166..b31ea014977 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
@@ -18,16 +18,23 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.MapReduceTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -74,4 +81,44 @@ public class TestWALInputFormat {
     WALInputFormat.addFile(lfss, lfs, now, now);
     assertEquals(8, lfss.size());
   }
+
+  @Test
+  public void testEmptyFileIsIgnoredWhenConfigured() throws IOException, 
InterruptedException {
+    List<InputSplit> splits = getSplitsForEmptyFile(true);
+    assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true", 
splits.isEmpty());
+  }
+
+  @Test
+  public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException, 
InterruptedException {
+    List<InputSplit> splits = getSplitsForEmptyFile(false);
+    assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is 
false", 1,
+      splits.size());
+  }
+
+  private List<InputSplit> getSplitsForEmptyFile(boolean ignoreEmptyFiles)
+    throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles);
+
+    JobContext jobContext = Mockito.mock(JobContext.class);
+    Mockito.when(jobContext.getConfiguration()).thenReturn(conf);
+
+    LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class);
+    Mockito.when(emptyFile.getLen()).thenReturn(0L);
+    Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal"));
+
+    WALInputFormat inputFormat = new WALInputFormat() {
+      @Override
+      Path[] getInputPaths(Configuration conf) {
+        return new Path[] { new Path("/input") };
+      }
+
+      @Override
+      List<FileStatus> getFiles(FileSystem fs, Path inputPath, long startTime, 
long endTime) {
+        return Collections.singletonList(emptyFile);
+      }
+    };
+
+    return inputFormat.getSplits(jobContext, "", "");
+  }
 }
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index b39d04802c9..7818f8d2f73 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -31,10 +32,12 @@ import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -338,4 +341,47 @@ public class TestWALPlayer {
 
   }
 
+  @Test
+  public void testIgnoreEmptyWALFiles() throws Exception {
+    Path inputDir = createEmptyWALFile("empty-wal-dir");
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path emptyWAL = new Path(inputDir, "empty.wal");
+
+    assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
+    assertEquals("WAL file should be 0 bytes", 0, 
dfs.getFileStatus(emptyWAL).getLen());
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true);
+
+    int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { 
inputDir.toString() });
+    assertEquals("WALPlayer should exit cleanly even with empty files", 0, 
exitCode);
+  }
+
+  @Test
+  public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception {
+    Path inputDir = createEmptyWALFile("fail-empty-wal-dir");
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path emptyWAL = new Path(inputDir, "empty.wal");
+
+    assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
+    assertEquals("WAL file should be 0 bytes", 0, 
dfs.getFileStatus(emptyWAL).getLen());
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false);
+
+    int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { 
inputDir.toString() });
+    assertNotEquals("WALPlayer should fail on empty files when not ignored", 
0, exitCode);
+  }
+
+  private Path createEmptyWALFile(String walDir) throws IOException {
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path inputDir = new Path("/" + walDir);
+    dfs.mkdirs(inputDir);
+
+    Path emptyWAL = new Path(inputDir, "empty.wal");
+    FSDataOutputStream out = dfs.create(emptyWAL);
+    out.close(); // Explicitly closing the stream
+
+    return inputDir;
+  }
 }

Reply via email to