This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 798ec2f4530 Revert "HBASE-28951 Handle simultaneous WAL splitting to
recovered edits by multiple worker (#7075)"
798ec2f4530 is described below
commit 798ec2f4530a07526298c88f10a3168cd120d2d7
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Aug 15 11:04:41 2025 -0700
Revert "HBASE-28951 Handle simultaneous WAL splitting to recovered edits by
multiple worker (#7075)"
This reverts commit 172ea3d5b204b7f82b805f07b60098b2577f5876.
---
.../wal/AbstractRecoveredEditsOutputSink.java | 97 ++++++----------------
.../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 19 ++---
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 69 +--------------
3 files changed, 33 insertions(+), 152 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 88b55a654c8..89edccc2253 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -22,8 +22,6 @@ import static
org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
import java.io.EOFException;
import java.io.IOException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -32,10 +30,8 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -49,7 +45,6 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
private static final Logger LOG =
LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
private final WALSplitter walSplitter;
private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new
ConcurrentHashMap<>();
- private static final int MAX_RENAME_RETRY_COUNT = 5;
public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int
numWriters) {
@@ -60,12 +55,9 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
/** Returns a writer that wraps a {@link WALProvider.Writer} and its Path.
Caller should close. */
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName
tableName, byte[] region,
long seqId) throws IOException {
- // If multiple worker are splitting a WAL at a same time, both should use
unique file name to
- // avoid conflict
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
walSplitter.getFileBeingSplit().getPath().getName(),
walSplitter.getTmpDirName(),
- walSplitter.conf, getWorkerNameComponent());
-
+ walSplitter.conf);
if (walSplitter.walFS.exists(regionEditsPath)) {
LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " +
regionEditsPath + ", length="
@@ -81,16 +73,6 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
}
- private String getWorkerNameComponent() {
- if (walSplitter.rsServices == null) {
- return "";
- }
- return URLEncoder.encode(
- walSplitter.rsServices.getServerName().toShortString()
- .replace(Addressing.HOSTNAME_PORT_SEPARATOR,
ServerName.SERVERNAME_SEPARATOR),
- StandardCharsets.UTF_8);
- }
-
/**
* abortRecoveredEditsWriter closes the editsWriter, but does not rename and
finalize the
* recovered edits WAL files. Please see HBASE-28569.
@@ -121,40 +103,22 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
try {
+ if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
+ deleteOneWithFewerEntries(editsWriter, dst);
+ }
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
// TestHLogSplit#testThreading is an example.
if (walSplitter.walFS.exists(editsWriter.path)) {
- boolean retry;
- int retryCount = 0;
- do {
- retry = false;
- retryCount++;
- // If rename is successful, it means recovered edits are
successfully places at right
- // place but if rename fails, there can be below reasons
- // 1. dst already exist - in this case if dst have desired edits we
will keep the dst,
- // delete the editsWriter.path and consider this success else if dst
have fewer edits, we
- // will delete the dst and retry the rename
- // 2. parent directory does not exit - in one edge case this is
possible when this worker
- // got stuck before rename and HMaster get another worker to split
the wal, SCP will
- // proceed and once region get opened on one RS, we delete the
recovered.edits directory,
- // in this case there is no harm in failing this procedure after
retry exhausted.
- if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
- retry = deleteOneWithFewerEntriesToRetry(editsWriter, dst);
- }
- } while (retry && retryCount < MAX_RENAME_RETRY_COUNT);
-
- // If we are out of loop with retry flag `true` it means we have
exhausted the retries.
- if (retry) {
- final String errorMsg = "Failed renaming recovered edits " +
editsWriter.path + " to "
- + dst + " in " + MAX_RENAME_RETRY_COUNT + " retries";
+ if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
+ final String errorMsg =
+ "Failed renaming recovered edits " + editsWriter.path + " to " +
dst;
updateStatusWithMsg(errorMsg);
throw new IOException(errorMsg);
- } else {
- final String renameEditMsg = "Rename recovered edits " +
editsWriter.path + " to " + dst;
- LOG.info(renameEditMsg);
- updateStatusWithMsg(renameEditMsg);
}
+ final String renameEditMsg = "Rename recovered edits " +
editsWriter.path + " to " + dst;
+ LOG.info(renameEditMsg);
+ updateStatusWithMsg(renameEditMsg);
}
} catch (IOException ioe) {
final String errorMsg = "Could not rename recovered edits " +
editsWriter.path + " to " + dst;
@@ -222,49 +186,36 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
}
// delete the one with fewer wal entries
- private boolean deleteOneWithFewerEntriesToRetry(RecoveredEditsWriter
editsWriter, Path dst)
+ private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter,
Path dst)
throws IOException {
- if (!walSplitter.walFS.exists(dst)) {
- LOG.info("dst {} doesn't exist, need to retry ", dst);
- return true;
+ long dstMinLogSeqNum = -1L;
+ try (WALStreamReader reader =
+ walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
+ WAL.Entry entry = reader.next();
+ if (entry != null) {
+ dstMinLogSeqNum = entry.getKey().getSequenceId();
+ }
+ } catch (EOFException e) {
+ LOG.debug("Got EOF when reading first WAL entry from {}, an empty or
broken WAL file?", dst,
+ e);
}
-
- if (isDstHasFewerEntries(editsWriter, dst)) {
+ if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a
previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst
+ ", length="
- + walSplitter.walFS.getFileStatus(dst).getLen() + " and retry is
needed");
+ + walSplitter.walFS.getFileStatus(dst).getLen());
if (!walSplitter.walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old {}", dst);
throw new IOException("Failed deleting of old " + dst);
}
- return true;
} else {
LOG
.warn("Found existing old edits file and we have less entries.
Deleting " + editsWriter.path
- + ", length=" +
walSplitter.walFS.getFileStatus(editsWriter.path).getLen()
- + " and no retry needed as dst has all edits");
+ + ", length=" +
walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
if (!walSplitter.walFS.delete(editsWriter.path, false)) {
LOG.warn("Failed deleting of {}", editsWriter.path);
throw new IOException("Failed deleting of " + editsWriter.path);
}
- return false;
- }
- }
-
- private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path
dst)
- throws IOException {
- long dstMinLogSeqNum = -1L;
- try (WALStreamReader reader =
- walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
- WAL.Entry entry = reader.next();
- if (entry != null) {
- dstMinLogSeqNum = entry.getKey().getSequenceId();
- }
- } catch (EOFException e) {
- LOG.debug("Got EOF when reading first WAL entry from {}, an empty or
broken WAL file?", dst,
- e);
}
- return editsWriter.minLogSeqNum < dstMinLogSeqNum;
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index d704caaff12..bd9bd6f9cc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -150,19 +150,17 @@ public final class WALSplitUtil {
* /hbase/some_table/2323432434/recovered.edits/2332. This method also
ensures existence of
* RECOVERED_EDITS_DIR under the region creating it if necessary. And also
set storage policy for
* RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured.
- * @param tableName the table name
- * @param encodedRegionName the encoded region name
- * @param seqId the sequence id which used to generate file
name
- * @param fileNameBeingSplit the file being split currently. Used to
generate tmp file name.
- * @param tmpDirName of the directory used to sideline old
recovered edits file
- * @param conf configuration
- * @param workerNameComponent the worker name component for the file name
+ * @param tableName the table name
+ * @param encodedRegionName the encoded region name
+ * @param seqId the sequence id which used to generate file name
+ * @param fileNameBeingSplit the file being split currently. Used to
generate tmp file name.
+ * @param tmpDirName of the directory used to sideline old recovered
edits file
+ * @param conf configuration
* @return Path to file into which to dump split log edits.
*/
@SuppressWarnings("deprecation")
static Path getRegionSplitEditsPath(TableName tableName, byte[]
encodedRegionName, long seqId,
- String fileNameBeingSplit, String tmpDirName, Configuration conf, String
workerNameComponent)
- throws IOException {
+ String fileNameBeingSplit, String tmpDirName, Configuration conf) throws
IOException {
FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
String encodedRegionNameStr = Bytes.toString(encodedRegionName);
@@ -194,8 +192,7 @@ public final class WALSplitUtil {
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
String fileName = formatRecoveredEditsFileName(seqId);
- fileName =
- getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" +
workerNameComponent);
+ fileName = getTmpRecoveredEditsFileName(fileName + "-" +
fileNameBeingSplit);
return new Path(dir, fileName);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 176fe845fcc..8f8cd38446f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -66,8 +66,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.LastSequenceId;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
@@ -107,7 +105,6 @@ import
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
@@ -375,70 +372,6 @@ public class TestWALSplit {
}
}
- // If another worker is assigned to split a WAl and last worker is still
running, both should not
- // impact each other's progress
- @Test
- public void testTwoWorkerSplittingSameWAL() throws IOException,
InterruptedException {
- int numWriter = 1, entries = 10;
- generateWALs(numWriter, entries, -1, 0);
- FileStatus logfile = fs.listStatus(WALDIR)[0];
- FileSystem spiedFs = Mockito.spy(fs);
- RegionServerServices zombieRSServices =
Mockito.mock(RegionServerServices.class);
- RegionServerServices newWorkerRSServices =
Mockito.mock(RegionServerServices.class);
- Mockito.when(zombieRSServices.getServerName())
- .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890"));
- Mockito.when(newWorkerRSServices.getServerName())
- .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870"));
- Thread zombieWorker = new SplitWALWorker(logfile, spiedFs,
zombieRSServices);
- Thread newWorker = new SplitWALWorker(logfile, spiedFs,
newWorkerRSServices);
- zombieWorker.start();
- newWorker.start();
- newWorker.join();
- zombieWorker.join();
-
- for (String region : REGIONS) {
- Path[] logfiles = getLogForRegion(TABLE_NAME, region);
- assertEquals("wrong number of split files for region", numWriter,
logfiles.length);
-
- int count = 0;
- for (Path lf : logfiles) {
- count += countWAL(lf);
- }
- assertEquals("wrong number of edits for region " + region, entries,
count);
- }
- }
-
- private class SplitWALWorker extends Thread implements LastSequenceId {
- final FileStatus logfile;
- final FileSystem fs;
- final RegionServerServices rsServices;
-
- public SplitWALWorker(FileStatus logfile, FileSystem fs,
RegionServerServices rsServices) {
- super(rsServices.getServerName().toShortString());
- setDaemon(true);
- this.fs = fs;
- this.logfile = logfile;
- this.rsServices = rsServices;
- }
-
- @Override
- public void run() {
- try {
- boolean ret =
- WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this,
null, wals, rsServices);
- assertTrue("Both splitting should pass", ret);
- } catch (IOException e) {
- LOG.warn(getName() + " Worker exiting " + e);
- }
- }
-
- @Override
- public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[]
encodedRegionName) {
- return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder()
- .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build();
- }
- }
-
/**
* @see "https://issues.apache.org/jira/browse/HBASE-3020"
*/
@@ -470,7 +403,7 @@ public class TestWALSplit {
private Path createRecoveredEditsPathForRegion() throws IOException {
byte[] encoded =
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME,
encoded, 1,
- FILENAME_BEING_SPLIT, TMPDIRNAME, conf, "");
+ FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
return p;
}