sandeepvinayak commented on a change in pull request #3376:
URL: https://github.com/apache/hbase/pull/3376#discussion_r649431795



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, 
Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, 
source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), 
walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream 
while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it 
directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream 
related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {

Review comment:
       I think we need to reset the sleepMultiplier to 1 in the else part of 
this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to