This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 0f82a447585 HBASE-28155 RecoveredReplicationSource quit when there are
still unfinished groups (#5466)
0f82a447585 is described below
commit 0f82a447585b33d93dbc00a6f5da3dcdd1a5c571
Author: Duo Zhang <[email protected]>
AuthorDate: Fri Oct 20 11:58:28 2023 +0800
HBASE-28155 RecoveredReplicationSource quit when there are still unfinished
groups (#5466)
Signed-off-by: Guanghao Zhang <[email protected]>
(cherry picked from commit dde504ce489fd3fd55166a872768a077400ba2ab)
---
.../regionserver/RecoveredReplicationSource.java | 16 ++++++++++++
.../regionserver/ReplicationSource.java | 29 ++++++++++++++++------
2 files changed, 37 insertions(+), 8 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e9062472221..e47df36e3aa 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -26,6 +26,22 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class RecoveredReplicationSource extends ReplicationSource {
+ @Override
+ protected void startShippers() {
+ for (String walGroupId : logQueue.getQueues().keySet()) {
+ workerThreads.put(walGroupId, createNewShipper(walGroupId));
+ }
+ // start shippers after initializing the workerThreads, as in the below
postFinish logic, if
+ // workerThreads is empty, we will mark the RecoveredReplicationSource as
finished. So if we
+ // start the worker on the fly, it is possible that a shipper has already
finished its work and
+ // called postFinish, and find out the workerThreads is empty and then
mark the
+ // RecoveredReplicationSource as finish, while the next shipper has not
been added to
+ // workerThreads yet. See HBASE-28155 for more details.
+ for (ReplicationSourceShipper shipper : workerThreads.values()) {
+ startShipper(shipper);
+ }
+ }
+
@Override
protected RecoveredReplicationSourceShipper createNewShipper(String
walGroupId,
ReplicationSourceWALReader walReader) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4c864e5e450..094fa4aaa78 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -360,6 +360,19 @@ public class ReplicationSource implements
ReplicationSourceInterface {
}
}
+ protected final ReplicationSourceShipper createNewShipper(String walGroupId)
{
+ ReplicationSourceWALReader walReader =
+ createNewWALReader(walGroupId, getStartOffset(walGroupId));
+ ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
+ Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
this::retryRefreshing);
+ return worker;
+ }
+
+ protected final void startShipper(ReplicationSourceShipper worker) {
+ worker.startup(this::retryRefreshing);
+ }
+
private void tryStartNewShipper(String walGroupId) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
@@ -367,12 +380,8 @@ public class ReplicationSource implements
ReplicationSourceInterface {
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}",
logPeerId(), walGroupId);
- ReplicationSourceWALReader walReader =
- createNewWALReader(walGroupId, getStartOffset(walGroupId));
- ReplicationSourceShipper worker = createNewShipper(walGroupId,
walReader);
- Threads.setDaemonThreadRunning(walReader,
Thread.currentThread().getName()
- + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
this::retryRefreshing);
- worker.startup(this::retryRefreshing);
+ ReplicationSourceShipper worker = createNewShipper(walGroupId);
+ startShipper(worker);
return worker;
}
});
@@ -522,7 +531,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
* @param sleepMultiplier by how many times the default sleeping time is
augmented
* @return True if <code>sleepMultiplier</code> is <
<code>maxRetriesMultiplier</code>
*/
- protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+ private boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg,
sleepForRetries,
@@ -605,10 +614,14 @@ public class ReplicationSource implements
ReplicationSourceInterface {
queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
// Start workers
+ startShippers();
+ setSourceStartupStatus(false);
+ }
+
+ protected void startShippers() {
for (String walGroupId : logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
- setSourceStartupStatus(false);
}
private synchronized void setSourceStartupStatus(boolean initializing) {