This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 3b3ec32 HBASE-26075: Replication is stuck due to zero length wal file
in oldWALs dir (#3467)
3b3ec32 is described below
commit 3b3ec323e410e01f7aa34f76cae59dfcc9278b33
Author: Rushabh Shah <[email protected]>
AuthorDate: Fri Jul 9 18:24:20 2021 -0400
HBASE-26075: Replication is stuck due to zero length wal file in oldWALs
dir (#3467)
Signed-off-by: Geoffrey Jacoby <[email protected]>
Signed-off-by: Bharath Vissapragada <[email protected]>
---
.../ReplicationSourceWALReaderThread.java | 13 ++++--
.../replication/regionserver/WALEntryStream.java | 2 +-
.../regionserver/TestWALEntryStream.java | 49 ++++++++++++++++++++++
3 files changed, 59 insertions(+), 5 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 54ed3ab..804793d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -308,10 +308,15 @@ public class ReplicationSourceWALReaderThread extends
Thread {
// add current log to recovered source queue so it is safe to remove.
if (e.getCause() instanceof EOFException && (isRecoveredSource ||
queue.size() > 1)
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
+ Path path = queue.peek();
try {
- if (fs.getFileStatus(queue.peek()).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: " +
queue.peek());
- lastReadPath = queue.peek();
+ if (!fs.exists(path)) {
+ // There is a chance that wal has moved to oldWALs directory, so
look there also.
+ path = entryStream.getArchivedLog(path);
+ }
+ if (fs.getFileStatus(path).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + path);
+ lastReadPath = path;
logQueue.remove(walGroupId);
lastReadPosition = 0;
@@ -325,7 +330,7 @@ public class ReplicationSourceWALReaderThread extends
Thread {
return true;
}
} catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " +
queue.peek());
+ LOG.warn("Couldn't get file length information about log " + path,
ioe);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index fbca0e6..88979b0 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -312,7 +312,7 @@ public class WALEntryStream implements Iterator<Entry>,
Closeable, Iterable<Entr
return false;
}
- private Path getArchivedLog(Path path) throws IOException {
+ Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index aa6485e..d1479ef 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
@@ -917,4 +918,52 @@ public class TestWALEntryStream {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
+
+ /**
+ * Tests that we handle EOFException properly if the wal has moved to
oldWALs directory.
+ * @throws Exception exception
+ */
+ @Test
+ public void testEOFExceptionInOldWALsDirectory() throws Exception {
+ assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
+ FSHLog fsLog = (FSHLog)log;
+ Path emptyLogFile = fsLog.getCurrentFileName();
+ log.rollWriter(true);
+ // There will 2 logs in the queue.
+ assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
+
+ Configuration localConf = new Configuration(conf);
+ localConf.setInt("replication.source.maxretriesmultiplier", 1);
+ localConf.setBoolean("replication.source.eof.autorecovery", true);
+
+ try (WALEntryStream entryStream =
+ new WALEntryStream(logQueue, fs, localConf, logQueue.getMetrics(),
fakeWalGroupId)) {
+ // Get the archived dir path for the first wal.
+ Path archivePath = entryStream.getArchivedLog(emptyLogFile);
+ // Make sure that the wal path is not the same as archived Dir path.
+ assertNotEquals(emptyLogFile.toString(), archivePath.toString());
+ assertTrue(fs.exists(archivePath));
+ fs.truncate(archivePath, 0);
+ // make sure the size of the wal file is 0.
+ assertEquals(0, fs.getFileStatus(archivePath).getLen());
+ }
+
+ ReplicationSourceManager mockSourceManager =
Mockito.mock(ReplicationSourceManager.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.isPeerEnabled()).thenReturn(true);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+
+ // Start the reader thread.
+ ReplicationSourceWALReaderThread readerThread =
+ new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),
logQueue, 0,
+ fs, localConf, getDummyFilter(), logQueue.getMetrics(), source,
fakeWalGroupId);
+ readerThread.start();
+ // Wait for the replication queue size to be 1. This means that we have
handled
+ // 0 length wal from oldWALs directory.
+ Waiter.waitFor(conf, 10000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return logQueue.getQueueSize(fakeWalGroupId) == 1;
+ }
+ });
+ }
}