This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 8881e91ce6d HBASE-28569 fix race condition during WAL splitting 
leading to corrupt recovered.edits (#6884)
8881e91ce6d is described below

commit 8881e91ce6d13a10ea7f112040145461c2c706a1
Author: Claire Iacono <[email protected]>
AuthorDate: Mon Apr 7 08:37:39 2025 -0700

    HBASE-28569 fix race condition during WAL splitting leading to corrupt 
recovered.edits (#6884)
    
    If an exception happens in the call to finishWriterThreads in the
    org.apache.hadoop.hbase.wal.RecoveredEditsOutputSink.close method,
    the call to closeWriters should not execute, as it may lead to a race 
condition
    that leads to file corruption if the regionserver aborts. The execution of
    closeWriters in this case would write the trailer in parallel with writer 
threads,
    causing corruption, and then the corrupt file would get renamed and 
finalized
    when it should not be. This corruption causes problems when the region is 
then
    to be assigned.
    To fix this, when finishWriterThreads throws an exception or is not 
successful,
    the corrupt files should not be renamed and finalized.
    
    Signed-off-by: Duo Zhang <[email protected]>
    (cherry picked from commit 81f29ae66bad5ea00ca6e5db30498269e06ded80)
---
 .../wal/AbstractRecoveredEditsOutputSink.java      |  68 +++++++++----
 .../hbase/wal/BoundedRecoveredEditsOutputSink.java |   2 +-
 .../hadoop/hbase/wal/RecoveredEditsOutputSink.java |  28 ++++--
 .../hbase/wal/TestRecoveredEditsOutputSink.java    | 111 +++++++++++++++++++++
 4 files changed, 182 insertions(+), 27 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 56e137e725f..0411b1b76bd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -73,33 +73,30 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
     return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
   }
 
-  protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
-    List<IOException> thrown) throws IOException {
+  /**
+   * abortRecoveredEditsWriter closes the editsWriter, but does not rename and 
finalize the
+   * recovered edits WAL files. Please see HBASE-28569.
+   */
+  protected void abortRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
+    List<IOException> thrown) {
+    closeRecoveredEditsWriter(editsWriter, thrown);
     try {
-      editsWriter.writer.close();
+      removeRecoveredEditsFile(editsWriter);
     } catch (IOException ioe) {
-      final String errorMsg = "Could not close recovered edits at " + 
editsWriter.path;
-      LOG.error(errorMsg, ioe);
+      final String errorMsg = "Failed removing recovered edits file at " + 
editsWriter.path;
+      LOG.error(errorMsg);
       updateStatusWithMsg(errorMsg);
-      thrown.add(ioe);
+    }
+  }
+
+  protected Path 
closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter editsWriter,
+    List<IOException> thrown) throws IOException {
+    if (!closeRecoveredEditsWriter(editsWriter, thrown)) {
       return null;
     }
-    final String msg = "Closed recovered edits writer path=" + 
editsWriter.path + " (wrote "
-      + editsWriter.editsWritten + " edits, skipped " + 
editsWriter.editsSkipped + " edits in "
-      + (editsWriter.nanosSpent / 1000 / 1000) + " ms)";
-    LOG.info(msg);
-    updateStatusWithMsg(msg);
     if (editsWriter.editsWritten == 0) {
       // just remove the empty recovered.edits file
-      if (
-        walSplitter.walFS.exists(editsWriter.path)
-          && !walSplitter.walFS.delete(editsWriter.path, false)
-      ) {
-        final String errorMsg = "Failed deleting empty " + editsWriter.path;
-        LOG.warn(errorMsg);
-        updateStatusWithMsg(errorMsg);
-        throw new IOException("Failed deleting empty  " + editsWriter.path);
-      }
+      removeRecoveredEditsFile(editsWriter);
       return null;
     }
 
@@ -133,6 +130,37 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
     return dst;
   }
 
+  private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
+    List<IOException> thrown) {
+    try {
+      editsWriter.writer.close();
+    } catch (IOException ioe) {
+      final String errorMsg = "Could not close recovered edits at " + 
editsWriter.path;
+      LOG.error(errorMsg, ioe);
+      updateStatusWithMsg(errorMsg);
+      thrown.add(ioe);
+      return false;
+    }
+    final String msg = "Closed recovered edits writer path=" + 
editsWriter.path + " (wrote "
+      + editsWriter.editsWritten + " edits, skipped " + 
editsWriter.editsSkipped + " edits in "
+      + (editsWriter.nanosSpent / 1000 / 1000) + " ms)";
+    LOG.info(msg);
+    updateStatusWithMsg(msg);
+    return true;
+  }
+
+  private void removeRecoveredEditsFile(RecoveredEditsWriter editsWriter) 
throws IOException {
+    if (
+      walSplitter.walFS.exists(editsWriter.path)
+        && !walSplitter.walFS.delete(editsWriter.path, false)
+    ) {
+      final String errorMsg = "Failed deleting empty " + editsWriter.path;
+      LOG.warn(errorMsg);
+      updateStatusWithMsg(errorMsg);
+      throw new IOException("Failed deleting empty  " + editsWriter.path);
+    }
+  }
+
   @Override
   public boolean keepRegionEvent(WAL.Entry entry) {
     ArrayList<Cell> cells = entry.getEdit().getCells();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
index 789a2ad157a..271ba55439e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -70,7 +70,7 @@ class BoundedRecoveredEditsOutputSink extends 
AbstractRecoveredEditsOutputSink {
       regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
         (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
       List<IOException> thrown = new ArrayList<>();
-      Path dst = closeRecoveredEditsWriter(writer, thrown);
+      Path dst = closeRecoveredEditsWriterAndFinalizeEdits(writer, thrown);
       splits.add(dst);
       openingWritersNum.decrementAndGet();
       if (!thrown.isEmpty()) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
index 44ef151b7b6..1cd4c07e14b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -89,24 +89,40 @@ class RecoveredEditsOutputSink extends 
AbstractRecoveredEditsOutputSink {
 
   @Override
   public List<Path> close() throws IOException {
-    boolean isSuccessful = true;
+    boolean isSuccessful;
     try {
       isSuccessful = finishWriterThreads(false);
-    } finally {
-      isSuccessful &= closeWriters();
+    } catch (IOException e) {
+      closeWriters(false);
+      throw e;
+    }
+    if (!isSuccessful) {
+      // Even if an exception is not thrown, finishWriterThreads() not being 
successful is an
+      // error case where the WAL files should not be finalized.
+      closeWriters(false);
+      return null;
     }
+    isSuccessful = closeWriters(true);
     return isSuccessful ? splits : null;
   }
 
   /**
-   * Close all of the output streams.
+   * Close all the output streams.
+   * @param finalizeEdits true in the successful close case, false when we 
don't want to rename and
+   *                      finalize the temporary, possibly corrupted WAL 
files, such as when there
+   *                      was a previous failure or exception. Please see 
HBASE-28569.
    * @return true when there is no error.
    */
-  private boolean closeWriters() throws IOException {
+  boolean closeWriters(boolean finalizeEdits) throws IOException {
     List<IOException> thrown = Lists.newArrayList();
     for (RecoveredEditsWriter writer : writers.values()) {
       closeCompletionService.submit(() -> {
-        Path dst = closeRecoveredEditsWriter(writer, thrown);
+        if (!finalizeEdits) {
+          abortRecoveredEditsWriter(writer, thrown);
+          LOG.trace("Aborted edits at {}", writer.path);
+          return null;
+        }
+        Path dst = closeRecoveredEditsWriterAndFinalizeEdits(writer, thrown);
         LOG.trace("Closed {}", dst);
         splits.add(dst);
         return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRecoveredEditsOutputSink.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRecoveredEditsOutputSink.java
new file mode 100644
index 00000000000..6d365d095b1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRecoveredEditsOutputSink.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestRecoveredEditsOutputSink {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRecoveredEditsOutputSink.class);
+
+  private static WALFactory wals;
+  private static FileSystem fs;
+  private static Path rootDir;
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static RecoveredEditsOutputSink outputSink;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
+    rootDir = TEST_UTIL.createRootDir();
+    fs = CommonFSUtils.getRootDirFileSystem(conf);
+    wals = new WALFactory(conf, "testRecoveredEditsOutputSinkWALFactory");
+    WALSplitter splitter = new WALSplitter(wals, conf, rootDir, fs, rootDir, 
fs);
+    WALSplitter.PipelineController pipelineController = new 
WALSplitter.PipelineController();
+    EntryBuffers sink = new EntryBuffers(pipelineController, 1024 * 1024);
+    outputSink = new RecoveredEditsOutputSink(splitter, pipelineController, 
sink, 3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    wals.close();
+    fs.delete(rootDir, true);
+  }
+
+  @Test
+  public void testCloseSuccess() throws IOException {
+    RecoveredEditsOutputSink spyOutputSink = Mockito.spy(outputSink);
+    spyOutputSink.close();
+    Mockito.verify(spyOutputSink, Mockito.times(1)).finishWriterThreads(false);
+    Mockito.verify(spyOutputSink, Mockito.times(1)).closeWriters(true);
+  }
+
+  /**
+   * When a WAL split is interrupted (ex. by a RegionServer abort), the thread 
join in
+   * finishWriterThreads() will get interrupted, rethrowing the exception 
without stopping the
+   * writer threads. Test to ensure that when this happens, 
RecoveredEditsOutputSink.close() does
+   * not rename the recoveredEdits WAL files as this can cause corruption. 
Please see HBASE-28569.
+   * However, the writers must still be closed.
+   */
+  @Test
+  public void testCloseWALSplitInterrupted() throws IOException {
+    RecoveredEditsOutputSink spyOutputSink = Mockito.spy(outputSink);
+    // The race condition will lead to an InterruptedException to be caught by 
finishWriterThreads()
+    // which is then rethrown as an InterruptedIOException.
+    Mockito.doThrow(new 
InterruptedIOException()).when(spyOutputSink).finishWriterThreads(false);
+    assertThrows(InterruptedIOException.class, spyOutputSink::close);
+    Mockito.verify(spyOutputSink, Mockito.times(1)).finishWriterThreads(false);
+    Mockito.verify(spyOutputSink, Mockito.times(1)).closeWriters(false);
+  }
+
+  /**
+   * When finishWriterThreads fails but does not throw an exception, ensure 
the writers are handled
+   * like in the exception case - the writers are closed but the 
recoveredEdits WAL files are not
+   * renamed.
+   */
+  @Test
+  public void testCloseWALFinishWriterThreadsFailed() throws IOException {
+    RecoveredEditsOutputSink spyOutputSink = Mockito.spy(outputSink);
+    Mockito.doReturn(false).when(spyOutputSink).finishWriterThreads(false);
+    spyOutputSink.close();
+    Mockito.verify(spyOutputSink, Mockito.times(1)).finishWriterThreads(false);
+    Mockito.verify(spyOutputSink, Mockito.times(1)).closeWriters(false);
+  }
+}

Reply via email to