nyl3532016 commented on a change in pull request #3376:
URL: https://github.com/apache/hbase/pull/3376#discussion_r651628983



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, 
Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, 
source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), 
walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream 
while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it 
directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream 
related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {

Review comment:
       I think no need to reset sleepMultiplier in else part 

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, 
Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, 
source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), 
walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream 
while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it 
directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream 
related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {
+          LOG.warn("Failed to read stream of replication entries", e);
+          if (sleepMultiplier < maxRetriesMultiplier) {
+            sleepMultiplier ++;
           }
-        } catch (InterruptedException e) {
-          LOG.trace("Interrupted while sleeping between WAL reads");
-          Thread.currentThread().interrupt();
-        } finally {
-          entryStream.close();

Review comment:
       we do not need close `entryStream`  and catch `IOException` outside?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to