comnetwork commented on code in PR #5158:
URL: https://github.com/apache/hbase/pull/5158#discussion_r1159563458
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java:
##########
@@ -145,43 +145,54 @@ public void run() {
source.getWALFileLengthProvider(), source.getSourceMetrics(),
walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while
we can
batch = null;
- if (!source.isPeerEnabled()) {
- Threads.sleep(sleepForRetries);
- continue;
+ boolean successAddToQueue = false;
+ try {
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
+ if (!checkQuota()) {
+ continue;
+ }
+ Path currentPath = entryStream.getCurrentPath();
+ WALEntryStream.HasNext hasNext = entryStream.hasNext();
+ if (hasNext == WALEntryStream.HasNext.NO) {
+ replicationDone();
+ return;
+ }
+ // first, check if we have switched a file, if so, we need to
manually add an EOF entry
+ // batch to the queue
+ if (currentPath != null && switched(entryStream, currentPath)) {
+ entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
+ continue;
+ }
+ if (hasNext == WALEntryStream.HasNext.RETRY) {
+ // sleep and retry
+ sleepMultiplier = sleep(sleepMultiplier);
+ continue;
+ }
+ if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
+ // retry immediately, this usually means we have switched a file
+ continue;
+ }
+ // below are all for hasNext == YES
+ batch = createBatch(entryStream);
+ readWALEntries(entryStream, batch);
Review Comment:
@Apache9 , thank you very much for point out it. I have narrowed the scope
and also found that the declaration of 'batch' variable could be moved into the
body of loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]