Apache9 commented on a change in pull request #3376:
URL: https://github.com/apache/hbase/pull/3376#discussion_r649661879



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, 
Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, 
source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), 
walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream 
while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it 
directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream 
related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {

Review comment:
       This is just a restore of the old behavior. I think the idea here is 
only reset sleepMultiplier to 1 when we actually got a batch successfully, so 
we do not need to reset it in the else part.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -270,43 +236,63 @@ private void handleEmptyWALEntryBatch() throws 
InterruptedException {
     }
   }
 
+  private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream 
entryStream)
+    throws IOException {
+    Path currentPath = entryStream.getCurrentPath();
+    if (!entryStream.hasNext()) {
+      // check whether we have switched a file
+      if (currentPath != null && switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      } else {
+        return null;
+      }
+    }
+    if (currentPath != null) {
+      if (switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      }
+    }
+    return createBatch(entryStream);
+  }
+
   /**
    * This is to handle the EOFException from the WAL entry stream. 
EOFException should
    * be handled carefully because there are chances of data loss because of 
never replicating
    * the data. Thus we should always try to ship existing batch of entries 
here.
    * If there was only one log in the queue before EOF, we ship the empty 
batch here
    * and since reader is still active, in the next iteration of reader we will
    * stop the reader.
+   * <p/>
    * If there was more than one log in the queue before EOF, we ship the 
existing batch
    * and reset the wal patch and position to the log with EOF, so shipper can 
remove
    * logs from replication queue
    * @return true only the IOE can be handled
    */
-  private boolean handleEofException(Exception e, WALEntryBatch batch)
-      throws InterruptedException {
+  private boolean handleEofException(Exception e, WALEntryBatch batch) {
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered 
Source
     // since we don't add current log to recovered source queue so it is safe 
to remove.
-    if ((e instanceof EOFException || e.getCause() instanceof EOFException)
-      && (source.isRecovered() || queue.size() > 1)
-      && this.eofAutoRecovery) {
+    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
+      (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
       Path head = queue.peek();
       try {
         if (fs.getFileStatus(head).getLen() == 0) {
           // head of the queue is an empty log file
           LOG.warn("Forcing removal of 0 length log in queue: {}", head);
           logQueue.remove(walGroupId);
           currentPosition = 0;
-          // After we removed the WAL from the queue, we should
-          // try shipping the existing batch of entries and set the wal 
position
-          // and path to the wal just dequeued to correctly remove logs from 
the zk
-          batch.setLastWalPath(head);
-          batch.setLastWalPosition(currentPosition);
-          addBatchToShippingQueue(batch);
+          if (batch != null) {
+            // After we removed the WAL from the queue, we should try shipping 
the existing batch of
+            // entries
+            addBatchToShippingQueue(batch);

Review comment:
       Please see my comment on jira
   
   
https://issues.apache.org/jira/browse/HBASE-25596?focusedCommentId=17360627&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17360627
   
   The problem here is that, although we will cut the batch when switching a 
WAL file, the way we test whether there is a WAL file change, will trigger a 
read on the next WAL file. So if the next file is empty, an EOFException will 
be thrown out, while we still have WAL data for the previous WAL file in the 
batch.
   
   You could try to comment this line, then the new UTs you added will fail.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, 
Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, 
source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), 
walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream 
while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it 
directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream 
related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may 
carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {

Review comment:
       As I said above, the idea here is to only reset sleepMultiplier to 1 
when we actually got a batch successfully. In handleEofException, we do not 
read an actual wal entry, just remove the empty wal from queue, we do not know 
if the next read will be successful or not. Do you have any other strong 
reasons about why we must reset it to 1 here?
   
   Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to