This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 3b3ec32  HBASE-26075: Replication is stuck due to zero length wal file 
in oldWALs dir (#3467)
3b3ec32 is described below

commit 3b3ec323e410e01f7aa34f76cae59dfcc9278b33
Author: Rushabh Shah <[email protected]>
AuthorDate: Fri Jul 9 18:24:20 2021 -0400

    HBASE-26075: Replication is stuck due to zero length wal file in oldWALs 
dir (#3467)
    
    Signed-off-by: Geoffrey Jacoby <[email protected]>
    Signed-off-by: Bharath Vissapragada <[email protected]>
---
 .../ReplicationSourceWALReaderThread.java          | 13 ++++--
 .../replication/regionserver/WALEntryStream.java   |  2 +-
 .../regionserver/TestWALEntryStream.java           | 49 ++++++++++++++++++++++
 3 files changed, 59 insertions(+), 5 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 54ed3ab..804793d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -308,10 +308,15 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     // add current log to recovered source queue so it is safe to remove.
     if (e.getCause() instanceof EOFException && (isRecoveredSource || 
queue.size() > 1)
         && conf.getBoolean("replication.source.eof.autorecovery", false)) {
+      Path path = queue.peek();
       try {
-        if (fs.getFileStatus(queue.peek()).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: " + 
queue.peek());
-          lastReadPath = queue.peek();
+        if (!fs.exists(path)) {
+          // There is a chance that wal has moved to oldWALs directory, so 
look there also.
+          path = entryStream.getArchivedLog(path);
+        }
+        if (fs.getFileStatus(path).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: " + path);
+          lastReadPath = path;
           logQueue.remove(walGroupId);
           lastReadPosition = 0;
 
@@ -325,7 +330,7 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
           return true;
         }
       } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + 
queue.peek());
+        LOG.warn("Couldn't get file length information about log " + path, 
ioe);
       }
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index fbca0e6..88979b0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -312,7 +312,7 @@ public class WALEntryStream implements Iterator<Entry>, 
Closeable, Iterable<Entr
     return false;
   }
 
-  private Path getArchivedLog(Path path) throws IOException {
+  Path getArchivedLog(Path path) throws IOException {
     Path rootDir = FSUtils.getRootDir(conf);
     Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     Path archivedLogLocation = new Path(oldLogDir, path.getName());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index aa6485e..d1479ef 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
@@ -917,4 +918,52 @@ public class TestWALEntryStream {
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
     }
   }
+
+  /**
+   * Tests that we handle EOFException properly if the wal has moved to 
oldWALs directory.
+   * @throws Exception exception
+   */
+  @Test
+  public void testEOFExceptionInOldWALsDirectory() throws Exception {
+    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
+    FSHLog fsLog = (FSHLog)log;
+    Path emptyLogFile = fsLog.getCurrentFileName();
+    log.rollWriter(true);
+    // There will 2 logs in the queue.
+    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
+
+    Configuration localConf = new Configuration(conf);
+    localConf.setInt("replication.source.maxretriesmultiplier", 1);
+    localConf.setBoolean("replication.source.eof.autorecovery", true);
+
+    try (WALEntryStream entryStream =
+      new WALEntryStream(logQueue, fs, localConf, logQueue.getMetrics(), 
fakeWalGroupId)) {
+      // Get the archived dir path for the first wal.
+      Path archivePath = entryStream.getArchivedLog(emptyLogFile);
+      // Make sure that the wal path is not the same as archived Dir path.
+      assertNotEquals(emptyLogFile.toString(), archivePath.toString());
+      assertTrue(fs.exists(archivePath));
+      fs.truncate(archivePath, 0);
+      // make sure the size of the wal file is 0.
+      assertEquals(0, fs.getFileStatus(archivePath).getLen());
+    }
+
+    ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+
+    // Start the reader thread.
+    ReplicationSourceWALReaderThread readerThread =
+      new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), 
logQueue, 0,
+        fs, localConf, getDummyFilter(), logQueue.getMetrics(), source, 
fakeWalGroupId);
+    readerThread.start();
+    // Wait for the replication queue size to be 1. This means that we have 
handled
+    // 0 length wal from oldWALs directory.
+    Waiter.waitFor(conf, 10000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return logQueue.getQueueSize(fakeWalGroupId) == 1;
+      }
+    });
+  }
 }

Reply via email to