This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new d292375 HBASE-25992 Polish the ReplicationSourceWALReader code for
2.x after HBASE-25596 (#3376)
d292375 is described below
commit d2923755ecc57cc1b0d1eee3d11d44e027d69df7
Author: Duo Zhang <[email protected]>
AuthorDate: Sun Jun 20 16:32:42 2021 +0800
HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after
HBASE-25596 (#3376)
Signed-off-by: Yulin Niu <[email protected]>
---
.../regionserver/ReplicationSourceWALReader.java | 168 ++++++++++-----------
.../SerialReplicationSourceWALReader.java | 22 +--
.../TestReplicationEmptyWALRecovery.java | 34 ++---
3 files changed, 93 insertions(+), 131 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 3631cad..6f28566 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -122,65 +122,51 @@ class ReplicationSourceWALReader extends Thread {
@Override
public void run() {
int sleepMultiplier = 1;
- WALEntryBatch batch = null;
- WALEntryStream entryStream = null;
- try {
- // we only loop back here if something fatal happened to our stream
- while (isReaderRunning()) {
- try {
- entryStream =
- new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(),
- source.getServerWALsBelongTo(), source.getSourceMetrics(),
walGroupId);
- while (isReaderRunning()) { // loop here to keep reusing stream
while we can
- if (!source.isPeerEnabled()) {
- Threads.sleep(sleepForRetries);
- continue;
- }
- if (!checkQuota()) {
- continue;
- }
-
- batch = createBatch(entryStream);
- batch = readWALEntries(entryStream, batch);
+ while (isReaderRunning()) { // we only loop back here if something fatal
happened to our stream
+ WALEntryBatch batch = null;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(logQueue, conf, currentPosition,
+ source.getWALFileLengthProvider(),
source.getServerWALsBelongTo(),
+ source.getSourceMetrics(), walGroupId)) {
+ while (isReaderRunning()) { // loop here to keep reusing stream while
we can
+ batch = null;
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
+ if (!checkQuota()) {
+ continue;
+ }
+ batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+ if (batch == null) {
+ // got no entries and didn't advance position in WAL
+ handleEmptyWALEntryBatch();
+ entryStream.reset(); // reuse stream
+ continue;
+ }
+ // if we have already switched a file, skip reading and put it
directly to the ship queue
+ if (!batch.isEndOfFile()) {
+ readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
- if (batch == null) {
- // either the queue have no WAL to read
- // or got no new entries (didn't advance position in WAL)
- handleEmptyWALEntryBatch();
- entryStream.reset(); // reuse stream
- } else {
- addBatchToShippingQueue(batch);
- }
}
- } catch (WALEntryFilterRetryableException | IOException e) { // stream
related
- if (handleEofException(e, batch)) {
- sleepMultiplier = 1;
- } else {
- LOG.warn("Failed to read stream of replication entries "
- + "or replication filter is recovering", e);
- if (sleepMultiplier < maxRetriesMultiplier) {
- sleepMultiplier++;
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
+ // need to propagate the batch even it has no entries since it may
carry the last
+ // sequence id information for serial replication.
+ LOG.debug("Read {} WAL entries eligible for replication",
batch.getNbEntries());
+ entryBatchQueue.put(batch);
+ sleepMultiplier = 1;
+ }
+ } catch (IOException e) { // stream related
+ if (!handleEofException(e, batch)) {
+ LOG.warn("Failed to read stream of replication entries", e);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier ++;
}
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
- } finally {
- entryStream.close();
+ Threads.sleep(sleepForRetries * sleepMultiplier);
}
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads or adding WAL
batch to ship queue");
+ Thread.currentThread().interrupt();
}
- } catch (IOException e) {
- if (sleepMultiplier < maxRetriesMultiplier) {
- LOG.debug("Failed to read stream of replication entries: ", e);
- sleepMultiplier++;
- } else {
- LOG.error("Failed to read stream of replication entries", e);
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
}
}
@@ -211,29 +197,10 @@ class ReplicationSourceWALReader extends Thread {
// We need to get the WALEntryBatch from the caller so we can add entries in
there
// This is required in case there is any exception in while reading entries
- // we do want to loss the existing entries in the batch
- protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
- WALEntryBatch batch) throws IOException, InterruptedException {
+ // we do not want to loss the existing entries in the batch
+ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch
batch)
+ throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
- if (!entryStream.hasNext()) {
- // check whether we have switched a file
- if (currentPath != null && switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
- } else {
- // This would mean either no more files in the queue
- // or there is no new data yet on the current wal
- return null;
- }
- }
- if (currentPath != null) {
- if (switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
- }
- } else {
- // when reading from the entry stream first time we will enter here
- currentPath = entryStream.getCurrentPath();
- }
- batch.setLastWalPath(currentPath);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
@@ -253,7 +220,6 @@ class ReplicationSourceWALReader extends Thread {
break;
}
}
- return batch;
}
private void handleEmptyWALEntryBatch() throws InterruptedException {
@@ -270,6 +236,25 @@ class ReplicationSourceWALReader extends Thread {
}
}
+ private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream
entryStream)
+ throws IOException {
+ Path currentPath = entryStream.getCurrentPath();
+ if (!entryStream.hasNext()) {
+ // check whether we have switched a file
+ if (currentPath != null && switched(entryStream, currentPath)) {
+ return WALEntryBatch.endOfFile(currentPath);
+ } else {
+ return null;
+ }
+ }
+ if (currentPath != null) {
+ if (switched(entryStream, currentPath)) {
+ return WALEntryBatch.endOfFile(currentPath);
+ }
+ }
+ return createBatch(entryStream);
+ }
+
/**
* This is to handle the EOFException from the WAL entry stream.
EOFException should
* be handled carefully because there are chances of data loss because of
never replicating
@@ -277,19 +262,18 @@ class ReplicationSourceWALReader extends Thread {
* If there was only one log in the queue before EOF, we ship the empty
batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
+ * <p/>
* If there was more than one log in the queue before EOF, we ship the
existing batch
* and reset the wal patch and position to the log with EOF, so shipper can
remove
* logs from replication queue
* @return true only the IOE can be handled
*/
- private boolean handleEofException(Exception e, WALEntryBatch batch)
- throws InterruptedException {
+ private boolean handleEofException(Exception e, WALEntryBatch batch) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered
Source
// since we don't add current log to recovered source queue so it is safe
to remove.
- if ((e instanceof EOFException || e.getCause() instanceof EOFException)
- && (source.isRecovered() || queue.size() > 1)
- && this.eofAutoRecovery) {
+ if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
+ (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
Path head = queue.peek();
try {
if (fs.getFileStatus(head).getLen() == 0) {
@@ -297,16 +281,18 @@ class ReplicationSourceWALReader extends Thread {
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId);
currentPosition = 0;
- // After we removed the WAL from the queue, we should
- // try shipping the existing batch of entries and set the wal
position
- // and path to the wal just dequeued to correctly remove logs from
the zk
- batch.setLastWalPath(head);
- batch.setLastWalPosition(currentPosition);
- addBatchToShippingQueue(batch);
+ if (batch != null) {
+ // After we removed the WAL from the queue, we should try shipping
the existing batch of
+ // entries
+ addBatchToShippingQueue(batch);
+ }
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " +
queue.peek(), ioe);
+ } catch (InterruptedException ie) {
+ LOG.trace("Interrupted while adding WAL batch to ship queue");
+ Thread.currentThread().interrupt();
}
}
return false;
@@ -316,10 +302,8 @@ class ReplicationSourceWALReader extends Thread {
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
- * @throws IOException throws io exception from stream
*/
- private void addBatchToShippingQueue(WALEntryBatch batch)
- throws InterruptedException, IOException {
+ private void addBatchToShippingQueue(WALEntryBatch batch) throws
InterruptedException {
// need to propagate the batch even it has no entries since it may carry
the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication",
batch.getNbEntries());
@@ -348,7 +332,7 @@ class ReplicationSourceWALReader extends Thread {
return true;
}
- protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
+ private WALEntryBatch createBatch(WALEntryStream entryStream) {
return new WALEntryBatch(replicationBatchCountCapacity,
entryStream.getCurrentPath());
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 254dc4a..1de4c99 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -50,27 +50,10 @@ public class SerialReplicationSourceWALReader extends
ReplicationSourceWALReader
}
@Override
- protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
WALEntryBatch batch)
- throws IOException, InterruptedException {
+ protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch
batch)
+ throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
- if (!entryStream.hasNext()) {
- // check whether we have switched a file
- if (currentPath != null && switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
- } else {
- return null;
- }
- }
- if (currentPath != null) {
- if (switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
- }
- } else {
- // when reading from the entry stream first time we will enter here
- currentPath = entryStream.getCurrentPath();
- }
long positionBefore = entryStream.getPosition();
- batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
@@ -122,7 +105,6 @@ public class SerialReplicationSourceWALReader extends
ReplicationSourceWALReader
break;
}
}
- return batch;
}
private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch
batch)
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 511161c..da7f988 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -47,14 +47,14 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category
- ({ ReplicationTests.class, LargeTests.class }) public class
TestReplicationEmptyWALRecovery
- extends TestReplicationBase {
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
static final RegionInfo info =
RegionInfoBuilder.newBuilder(tableName).build();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
@Before
@@ -151,10 +151,9 @@ import org.junit.experimental.categories.Category;
}
/**
- * Test empty WAL along with non empty WALs in the same batch. This test is
to make sure
- * when we see the empty and handle the EOF exception, we are able to
existing the previous
- * batch of entries without loosing it. This test also tests the number of
batches shipped
- *
+ * Test empty WAL along with non empty WALs in the same batch. This test is
to make sure when we
+ * see the empty and handle the EOF exception, we are able to ship the
previous batch of entries
+ * without loosing it. This test also tests the number of batches shipped
* @throws Exception throws any exception
*/
@Test
@@ -174,7 +173,6 @@ import org.junit.experimental.categories.Category;
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
- wal.rollWriter();
String walGroupId =
AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId +
"." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
@@ -183,10 +181,10 @@ import org.junit.experimental.categories.Category;
injectEmptyWAL(numRs, emptyWalPaths);
// There should be three WALs in queue
- // 1. empty WAL
- // 2. non empty WAL
+ // 1. non empty WAL
+ // 2. empty WAL
// 3. live WAL
- //verifyNumberOfLogsInQueue(3, numRs);
+ verifyNumberOfLogsInQueue(3, numRs);
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test
will fail
waitForLogAdvance(numRs);
@@ -209,10 +207,9 @@ import org.junit.experimental.categories.Category;
}
/**
- * Test empty WAL along with non empty WALs in the same batch. This test is
to make sure
- * when we see the empty WAL and handle the EOF exception, we are able to
proceed
- * with next batch and replicate it properly without missing data.
- *
+ * Test empty WAL along with non empty WALs in the same batch. This test is
to make sure when we
+ * see the empty WAL and handle the EOF exception, we are able to proceed
with next batch and
+ * replicate it properly without missing data.
* @throws Exception throws any exception
*/
@Test
@@ -265,9 +262,8 @@ import org.junit.experimental.categories.Category;
}
/**
- * This test make sure we replicate all the enties from the non empty WALs
which
- * are surrounding the empty WALs
- *
+ * This test make sure we replicate all the enties from the non empty WALs
which are surrounding
+ * the empty WALs
* @throws Exception throws exception
*/
@Test