This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new d292375  HBASE-25992 Polish the ReplicationSourceWALReader code for 
2.x after HBASE-25596 (#3376)
d292375 is described below

commit d2923755ecc57cc1b0d1eee3d11d44e027d69df7
Author: Duo Zhang <[email protected]>
AuthorDate: Sun Jun 20 16:32:42 2021 +0800

    HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after 
HBASE-25596 (#3376)
    
    Signed-off-by: Yulin Niu <[email protected]>
---
 .../regionserver/ReplicationSourceWALReader.java   | 168 ++++++++++-----------
 .../SerialReplicationSourceWALReader.java          |  22 +--
 .../TestReplicationEmptyWALRecovery.java           |  34 ++---
 3 files changed, 93 insertions(+), 131 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 3631cad..6f28566 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -122,65 +122,51 @@ class ReplicationSourceWALReader extends Thread {
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, 
source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), 
walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream 
while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it 
directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream 
related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {
+          LOG.warn("Failed to read stream of replication entries", e);
+          if (sleepMultiplier < maxRetriesMultiplier) {
+            sleepMultiplier ++;
           }
-        } catch (InterruptedException e) {
-          LOG.trace("Interrupted while sleeping between WAL reads");
-          Thread.currentThread().interrupt();
-        } finally {
-          entryStream.close();
+          Threads.sleep(sleepForRetries * sleepMultiplier);
         }
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while sleeping between WAL reads or adding WAL 
batch to ship queue");
+        Thread.currentThread().interrupt();
       }
-    } catch (IOException e) {
-      if (sleepMultiplier < maxRetriesMultiplier) {
-        LOG.debug("Failed to read stream of replication entries: ", e);
-        sleepMultiplier++;
-      } else {
-        LOG.error("Failed to read stream of replication entries", e);
-      }
-      Threads.sleep(sleepForRetries * sleepMultiplier);
-    } catch (InterruptedException e) {
-      LOG.trace("Interrupted while sleeping between WAL reads");
-      Thread.currentThread().interrupt();
     }
   }
 
@@ -211,29 +197,10 @@ class ReplicationSourceWALReader extends Thread {
 
   // We need to get the WALEntryBatch from the caller so we can add entries in 
there
   // This is required in case there is any exception in while reading entries
-  // we do want to loss the existing entries in the batch
-  protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
-      WALEntryBatch batch) throws IOException, InterruptedException {
+  // we do not want to loss the existing entries in the batch
+  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch 
batch)
+    throws IOException, InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
-    if (!entryStream.hasNext()) {
-      // check whether we have switched a file
-      if (currentPath != null && switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
-      } else {
-        // This would mean either no more files in the queue
-        // or there is no new data yet on the current wal
-        return null;
-      }
-    }
-    if (currentPath != null) {
-      if (switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
-      }
-    } else {
-      // when reading from the entry stream first time we will enter here
-      currentPath = entryStream.getCurrentPath();
-    }
-    batch.setLastWalPath(currentPath);
     for (;;) {
       Entry entry = entryStream.next();
       batch.setLastWalPosition(entryStream.getPosition());
@@ -253,7 +220,6 @@ class ReplicationSourceWALReader extends Thread {
         break;
       }
     }
-    return batch;
   }
 
   private void handleEmptyWALEntryBatch() throws InterruptedException {
@@ -270,6 +236,25 @@ class ReplicationSourceWALReader extends Thread {
     }
   }
 
+  private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream 
entryStream)
+    throws IOException {
+    Path currentPath = entryStream.getCurrentPath();
+    if (!entryStream.hasNext()) {
+      // check whether we have switched a file
+      if (currentPath != null && switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      } else {
+        return null;
+      }
+    }
+    if (currentPath != null) {
+      if (switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      }
+    }
+    return createBatch(entryStream);
+  }
+
   /**
    * This is to handle the EOFException from the WAL entry stream. 
EOFException should
    * be handled carefully because there are chances of data loss because of 
never replicating
@@ -277,19 +262,18 @@ class ReplicationSourceWALReader extends Thread {
    * If there was only one log in the queue before EOF, we ship the empty 
batch here
    * and since reader is still active, in the next iteration of reader we will
    * stop the reader.
+   * <p/>
    * If there was more than one log in the queue before EOF, we ship the 
existing batch
    * and reset the wal patch and position to the log with EOF, so shipper can 
remove
    * logs from replication queue
    * @return true only the IOE can be handled
    */
-  private boolean handleEofException(Exception e, WALEntryBatch batch)
-      throws InterruptedException {
+  private boolean handleEofException(Exception e, WALEntryBatch batch) {
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered 
Source
     // since we don't add current log to recovered source queue so it is safe 
to remove.
-    if ((e instanceof EOFException || e.getCause() instanceof EOFException)
-      && (source.isRecovered() || queue.size() > 1)
-      && this.eofAutoRecovery) {
+    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
+      (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
       Path head = queue.peek();
       try {
         if (fs.getFileStatus(head).getLen() == 0) {
@@ -297,16 +281,18 @@ class ReplicationSourceWALReader extends Thread {
           LOG.warn("Forcing removal of 0 length log in queue: {}", head);
           logQueue.remove(walGroupId);
           currentPosition = 0;
-          // After we removed the WAL from the queue, we should
-          // try shipping the existing batch of entries and set the wal 
position
-          // and path to the wal just dequeued to correctly remove logs from 
the zk
-          batch.setLastWalPath(head);
-          batch.setLastWalPosition(currentPosition);
-          addBatchToShippingQueue(batch);
+          if (batch != null) {
+            // After we removed the WAL from the queue, we should try shipping 
the existing batch of
+            // entries
+            addBatchToShippingQueue(batch);
+          }
           return true;
         }
       } catch (IOException ioe) {
         LOG.warn("Couldn't get file length information about log " + 
queue.peek(), ioe);
+      } catch (InterruptedException ie) {
+        LOG.trace("Interrupted while adding WAL batch to ship queue");
+        Thread.currentThread().interrupt();
       }
     }
     return false;
@@ -316,10 +302,8 @@ class ReplicationSourceWALReader extends Thread {
    * Update the batch try to ship and return true if shipped
    * @param batch Batch of entries to ship
    * @throws InterruptedException throws interrupted exception
-   * @throws IOException throws io exception from stream
    */
-  private void addBatchToShippingQueue(WALEntryBatch batch)
-    throws InterruptedException, IOException {
+  private void addBatchToShippingQueue(WALEntryBatch batch) throws 
InterruptedException {
     // need to propagate the batch even it has no entries since it may carry 
the last
     // sequence id information for serial replication.
     LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
@@ -348,7 +332,7 @@ class ReplicationSourceWALReader extends Thread {
     return true;
   }
 
-  protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
+  private WALEntryBatch createBatch(WALEntryStream entryStream) {
     return new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 254dc4a..1de4c99 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -50,27 +50,10 @@ public class SerialReplicationSourceWALReader extends 
ReplicationSourceWALReader
   }
 
   @Override
-  protected WALEntryBatch readWALEntries(WALEntryStream entryStream, 
WALEntryBatch batch)
-      throws IOException, InterruptedException {
+  protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch 
batch)
+    throws IOException, InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
-    if (!entryStream.hasNext()) {
-      // check whether we have switched a file
-      if (currentPath != null && switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
-      } else {
-        return null;
-      }
-    }
-    if (currentPath != null) {
-      if (switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
-      }
-    } else {
-      // when reading from the entry stream first time we will enter here
-      currentPath = entryStream.getCurrentPath();
-    }
     long positionBefore = entryStream.getPosition();
-    batch = createBatch(entryStream);
     for (;;) {
       Entry entry = entryStream.peek();
       boolean doFiltering = true;
@@ -122,7 +105,6 @@ public class SerialReplicationSourceWALReader extends 
ReplicationSourceWALReader
         break;
       }
     }
-    return batch;
   }
 
   private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch 
batch)
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 511161c..da7f988 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -47,14 +47,14 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category
-  ({ ReplicationTests.class, LargeTests.class }) public class 
TestReplicationEmptyWALRecovery
-  extends TestReplicationBase {
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
   MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
   static final RegionInfo info = 
RegionInfoBuilder.newBuilder(tableName).build();
   NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
-  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
 
   @Before
@@ -151,10 +151,9 @@ import org.junit.experimental.categories.Category;
   }
 
   /**
-   * Test empty WAL along with non empty WALs in the same batch. This test is 
to make sure
-   * when we see the empty and handle the EOF exception, we are able to 
existing the previous
-   * batch of entries without loosing it. This test also tests the number of 
batches shipped
-   *
+   * Test empty WAL along with non empty WALs in the same batch. This test is 
to make sure when we
+   * see the empty and handle the EOF exception, we are able to ship the 
previous batch of entries
+   * without loosing it. This test also tests the number of batches shipped
    * @throws Exception throws any exception
    */
   @Test
@@ -174,7 +173,6 @@ import org.junit.experimental.categories.Category;
       Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
 
       appendEntriesToWal(numOfEntriesToReplicate, wal);
-      wal.rollWriter();
       String walGroupId = 
AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
       Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + 
"." + ts);
       UTIL1.getTestFileSystem().create(emptyWalPath).close();
@@ -183,10 +181,10 @@ import org.junit.experimental.categories.Category;
 
     injectEmptyWAL(numRs, emptyWalPaths);
     // There should be three WALs in queue
-    // 1. empty WAL
-    // 2. non empty WAL
+    // 1. non empty WAL
+    // 2. empty WAL
     // 3. live WAL
-    //verifyNumberOfLogsInQueue(3, numRs);
+    verifyNumberOfLogsInQueue(3, numRs);
     hbaseAdmin.enableReplicationPeer(PEER_ID2);
     // ReplicationSource should advance past the empty wal, or else the test 
will fail
     waitForLogAdvance(numRs);
@@ -209,10 +207,9 @@ import org.junit.experimental.categories.Category;
   }
 
   /**
-   * Test empty WAL along with non empty WALs in the same batch. This test is 
to make sure
-   * when we see the empty WAL and handle the EOF exception, we are able to 
proceed
-   * with next batch and replicate it properly without missing data.
-   *
+   * Test empty WAL along with non empty WALs in the same batch. This test is 
to make sure when we
+   * see the empty WAL and handle the EOF exception, we are able to proceed 
with next batch and
+   * replicate it properly without missing data.
    * @throws Exception throws any exception
    */
   @Test
@@ -265,9 +262,8 @@ import org.junit.experimental.categories.Category;
   }
 
   /**
-   * This test make sure we replicate all the enties from the non empty WALs 
which
-   * are surrounding the empty WALs
-   *
+   * This test make sure we replicate all the enties from the non empty WALs 
which are surrounding
+   * the empty WALs
    * @throws Exception throws exception
    */
   @Test

Reply via email to