Apache9 commented on code in PR #5158:
URL: https://github.com/apache/hbase/pull/5158#discussion_r1158565853


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java:
##########
@@ -145,43 +145,54 @@ public void run() {
         source.getWALFileLengthProvider(), source.getSourceMetrics(), 
walGroupId)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
           batch = null;
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
+          boolean successAddToQueue = false;
+          try {
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+            Path currentPath = entryStream.getCurrentPath();
+            WALEntryStream.HasNext hasNext = entryStream.hasNext();
+            if (hasNext == WALEntryStream.HasNext.NO) {
+              replicationDone();
+              return;
+            }
+            // first, check if we have switched a file, if so, we need to 
manually add an EOF entry
+            // batch to the queue
+            if (currentPath != null && switched(entryStream, currentPath)) {
+              entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
+              continue;
+            }
+            if (hasNext == WALEntryStream.HasNext.RETRY) {
+              // sleep and retry
+              sleepMultiplier = sleep(sleepMultiplier);
+              continue;
+            }
+            if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
+              // retry immediately, this usually means we have switched a file
+              continue;
+            }
+            // below are all for hasNext == YES
+            batch = createBatch(entryStream);
+            readWALEntries(entryStream, batch);

Review Comment:
   Let's narrow the scope for the 'try catch' block? I think the only place 
where we leak the WALEntryBatch is here, when calling readWALEntries, as it 
could throw WALEntryFilterRetryableException, and only inside readWALEntries, 
we will call acquireBufferQuota.
   So I think we only need to use 'try catch' to wrap the readWALEntries, or at 
least, wrap the code after createBatch, if you think 
`entryBatchQueue.put(batch);` could also throw InterruptedException :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to