This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 798ec2f4530 Revert "HBASE-28951 Handle simultaneous WAL splitting to 
recovered edits by multiple worker (#7075)"
798ec2f4530 is described below

commit 798ec2f4530a07526298c88f10a3168cd120d2d7
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Aug 15 11:04:41 2025 -0700

    Revert "HBASE-28951 Handle simultaneous WAL splitting to recovered edits by 
multiple worker (#7075)"
    
    This reverts commit 172ea3d5b204b7f82b805f07b60098b2577f5876.
---
 .../wal/AbstractRecoveredEditsOutputSink.java      | 97 ++++++----------------
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  | 19 ++---
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  | 69 +--------------
 3 files changed, 33 insertions(+), 152 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 88b55a654c8..89edccc2253 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -22,8 +22,6 @@ import static 
org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -32,10 +30,8 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -49,7 +45,6 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
   private static final Logger LOG = 
LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
   private final WALSplitter walSplitter;
   private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new 
ConcurrentHashMap<>();
-  private static final int MAX_RENAME_RETRY_COUNT = 5;
 
   public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
     WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int 
numWriters) {
@@ -60,12 +55,9 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
   /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. 
Caller should close. */
   protected RecoveredEditsWriter createRecoveredEditsWriter(TableName 
tableName, byte[] region,
     long seqId) throws IOException {
-    // If multiple worker are splitting a WAL at a same time, both should use 
unique file name to
-    // avoid conflict
     Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
       walSplitter.getFileBeingSplit().getPath().getName(), 
walSplitter.getTmpDirName(),
-      walSplitter.conf, getWorkerNameComponent());
-
+      walSplitter.conf);
     if (walSplitter.walFS.exists(regionEditsPath)) {
       LOG.warn("Found old edits file. It could be the "
         + "result of a previous failed split attempt. Deleting " + 
regionEditsPath + ", length="
@@ -81,16 +73,6 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
     return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
   }
 
-  private String getWorkerNameComponent() {
-    if (walSplitter.rsServices == null) {
-      return "";
-    }
-    return URLEncoder.encode(
-      walSplitter.rsServices.getServerName().toShortString()
-        .replace(Addressing.HOSTNAME_PORT_SEPARATOR, 
ServerName.SERVERNAME_SEPARATOR),
-      StandardCharsets.UTF_8);
-  }
-
   /**
    * abortRecoveredEditsWriter closes the editsWriter, but does not rename and 
finalize the
    * recovered edits WAL files. Please see HBASE-28569.
@@ -121,40 +103,22 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
     Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
       
regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
     try {
+      if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
+        deleteOneWithFewerEntries(editsWriter, dst);
+      }
       // Skip the unit tests which create a splitter that reads and
       // writes the data without touching disk.
       // TestHLogSplit#testThreading is an example.
       if (walSplitter.walFS.exists(editsWriter.path)) {
-        boolean retry;
-        int retryCount = 0;
-        do {
-          retry = false;
-          retryCount++;
-          // If rename is successful, it means recovered edits are 
successfully places at right
-          // place but if rename fails, there can be below reasons
-          // 1. dst already exist - in this case if dst have desired edits we 
will keep the dst,
-          // delete the editsWriter.path and consider this success else if dst 
have fewer edits, we
-          // will delete the dst and retry the rename
-          // 2. parent directory does not exit - in one edge case this is 
possible when this worker
-          // got stuck before rename and HMaster get another worker to split 
the wal, SCP will
-          // proceed and once region get opened on one RS, we delete the 
recovered.edits directory,
-          // in this case there is no harm in failing this procedure after 
retry exhausted.
-          if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
-            retry = deleteOneWithFewerEntriesToRetry(editsWriter, dst);
-          }
-        } while (retry && retryCount < MAX_RENAME_RETRY_COUNT);
-
-        // If we are out of loop with retry flag `true` it means we have 
exhausted the retries.
-        if (retry) {
-          final String errorMsg = "Failed renaming recovered edits " + 
editsWriter.path + " to "
-            + dst + " in " + MAX_RENAME_RETRY_COUNT + " retries";
+        if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
+          final String errorMsg =
+            "Failed renaming recovered edits " + editsWriter.path + " to " + 
dst;
           updateStatusWithMsg(errorMsg);
           throw new IOException(errorMsg);
-        } else {
-          final String renameEditMsg = "Rename recovered edits " + 
editsWriter.path + " to " + dst;
-          LOG.info(renameEditMsg);
-          updateStatusWithMsg(renameEditMsg);
         }
+        final String renameEditMsg = "Rename recovered edits " + 
editsWriter.path + " to " + dst;
+        LOG.info(renameEditMsg);
+        updateStatusWithMsg(renameEditMsg);
       }
     } catch (IOException ioe) {
       final String errorMsg = "Could not rename recovered edits " + 
editsWriter.path + " to " + dst;
@@ -222,49 +186,36 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
   }
 
   // delete the one with fewer wal entries
-  private boolean deleteOneWithFewerEntriesToRetry(RecoveredEditsWriter 
editsWriter, Path dst)
+  private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, 
Path dst)
     throws IOException {
-    if (!walSplitter.walFS.exists(dst)) {
-      LOG.info("dst {} doesn't exist, need to retry ", dst);
-      return true;
+    long dstMinLogSeqNum = -1L;
+    try (WALStreamReader reader =
+      walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
+      WAL.Entry entry = reader.next();
+      if (entry != null) {
+        dstMinLogSeqNum = entry.getKey().getSequenceId();
+      }
+    } catch (EOFException e) {
+      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or 
broken WAL file?", dst,
+        e);
     }
-
-    if (isDstHasFewerEntries(editsWriter, dst)) {
+    if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
       LOG.warn("Found existing old edits file. It could be the result of a 
previous failed"
         + " split attempt or we have duplicated wal entries. Deleting " + dst 
+ ", length="
-        + walSplitter.walFS.getFileStatus(dst).getLen() + " and retry is 
needed");
+        + walSplitter.walFS.getFileStatus(dst).getLen());
       if (!walSplitter.walFS.delete(dst, false)) {
         LOG.warn("Failed deleting of old {}", dst);
         throw new IOException("Failed deleting of old " + dst);
       }
-      return true;
     } else {
       LOG
         .warn("Found existing old edits file and we have less entries. 
Deleting " + editsWriter.path
-          + ", length=" + 
walSplitter.walFS.getFileStatus(editsWriter.path).getLen()
-          + " and no retry needed as dst has all edits");
+          + ", length=" + 
walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
       if (!walSplitter.walFS.delete(editsWriter.path, false)) {
         LOG.warn("Failed deleting of {}", editsWriter.path);
         throw new IOException("Failed deleting of " + editsWriter.path);
       }
-      return false;
-    }
-  }
-
-  private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path 
dst)
-    throws IOException {
-    long dstMinLogSeqNum = -1L;
-    try (WALStreamReader reader =
-      walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
-      WAL.Entry entry = reader.next();
-      if (entry != null) {
-        dstMinLogSeqNum = entry.getKey().getSequenceId();
-      }
-    } catch (EOFException e) {
-      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or 
broken WAL file?", dst,
-        e);
     }
-    return editsWriter.minLogSeqNum < dstMinLogSeqNum;
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index d704caaff12..bd9bd6f9cc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -150,19 +150,17 @@ public final class WALSplitUtil {
    * /hbase/some_table/2323432434/recovered.edits/2332. This method also 
ensures existence of
    * RECOVERED_EDITS_DIR under the region creating it if necessary. And also 
set storage policy for
    * RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured.
-   * @param tableName           the table name
-   * @param encodedRegionName   the encoded region name
-   * @param seqId               the sequence id which used to generate file 
name
-   * @param fileNameBeingSplit  the file being split currently. Used to 
generate tmp file name.
-   * @param tmpDirName          of the directory used to sideline old 
recovered edits file
-   * @param conf                configuration
-   * @param workerNameComponent the worker name component for the file name
+   * @param tableName          the table name
+   * @param encodedRegionName  the encoded region name
+   * @param seqId              the sequence id which used to generate file name
+   * @param fileNameBeingSplit the file being split currently. Used to 
generate tmp file name.
+   * @param tmpDirName         of the directory used to sideline old recovered 
edits file
+   * @param conf               configuration
    * @return Path to file into which to dump split log edits.
    */
   @SuppressWarnings("deprecation")
   static Path getRegionSplitEditsPath(TableName tableName, byte[] 
encodedRegionName, long seqId,
-    String fileNameBeingSplit, String tmpDirName, Configuration conf, String 
workerNameComponent)
-    throws IOException {
+    String fileNameBeingSplit, String tmpDirName, Configuration conf) throws 
IOException {
     FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
     Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
     String encodedRegionNameStr = Bytes.toString(encodedRegionName);
@@ -194,8 +192,7 @@ public final class WALSplitUtil {
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
     // region's replayRecoveredEdits will not delete it
     String fileName = formatRecoveredEditsFileName(seqId);
-    fileName =
-      getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" + 
workerNameComponent);
+    fileName = getTmpRecoveredEditsFileName(fileName + "-" + 
fileNameBeingSplit);
     return new Path(dir, fileName);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 176fe845fcc..8f8cd38446f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -66,8 +66,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.LastSequenceId;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
 import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
@@ -107,7 +105,6 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 /**
@@ -375,70 +372,6 @@ public class TestWALSplit {
     }
   }
 
-  // If another worker is assigned to split a WAl and last worker is still 
running, both should not
-  // impact each other's progress
-  @Test
-  public void testTwoWorkerSplittingSameWAL() throws IOException, 
InterruptedException {
-    int numWriter = 1, entries = 10;
-    generateWALs(numWriter, entries, -1, 0);
-    FileStatus logfile = fs.listStatus(WALDIR)[0];
-    FileSystem spiedFs = Mockito.spy(fs);
-    RegionServerServices zombieRSServices = 
Mockito.mock(RegionServerServices.class);
-    RegionServerServices newWorkerRSServices = 
Mockito.mock(RegionServerServices.class);
-    Mockito.when(zombieRSServices.getServerName())
-      .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890"));
-    Mockito.when(newWorkerRSServices.getServerName())
-      .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870"));
-    Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, 
zombieRSServices);
-    Thread newWorker = new SplitWALWorker(logfile, spiedFs, 
newWorkerRSServices);
-    zombieWorker.start();
-    newWorker.start();
-    newWorker.join();
-    zombieWorker.join();
-
-    for (String region : REGIONS) {
-      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
-      assertEquals("wrong number of split files for region", numWriter, 
logfiles.length);
-
-      int count = 0;
-      for (Path lf : logfiles) {
-        count += countWAL(lf);
-      }
-      assertEquals("wrong number of edits for region " + region, entries, 
count);
-    }
-  }
-
-  private class SplitWALWorker extends Thread implements LastSequenceId {
-    final FileStatus logfile;
-    final FileSystem fs;
-    final RegionServerServices rsServices;
-
-    public SplitWALWorker(FileStatus logfile, FileSystem fs, 
RegionServerServices rsServices) {
-      super(rsServices.getServerName().toShortString());
-      setDaemon(true);
-      this.fs = fs;
-      this.logfile = logfile;
-      this.rsServices = rsServices;
-    }
-
-    @Override
-    public void run() {
-      try {
-        boolean ret =
-          WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, 
null, wals, rsServices);
-        assertTrue("Both splitting should pass", ret);
-      } catch (IOException e) {
-        LOG.warn(getName() + " Worker exiting " + e);
-      }
-    }
-
-    @Override
-    public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] 
encodedRegionName) {
-      return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder()
-        .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build();
-    }
-  }
-
   /**
    * @see "https://issues.apache.org/jira/browse/HBASE-3020";
    */
@@ -470,7 +403,7 @@ public class TestWALSplit {
   private Path createRecoveredEditsPathForRegion() throws IOException {
     byte[] encoded = 
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
     Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, 
encoded, 1,
-      FILENAME_BEING_SPLIT, TMPDIRNAME, conf, "");
+      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
     return p;
   }
 

Reply via email to