[
https://issues.apache.org/jira/browse/HBASE-11302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020274#comment-14020274
]
Hudson commented on HBASE-11302:
--------------------------------
ABORTED: Integrated in HBase-TRUNK #5180 (See
[https://builds.apache.org/job/HBase-TRUNK/5180/])
HBASE-11302 ReplicationSourceManager#sources is not thread safe (Qianxi Zhang)
(tedyu: rev b5e660fff4afcefcde0ec635d7830b4f00d65627)
*
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
> ReplicationSourceManager#sources is not thread safe
> ---------------------------------------------------
>
> Key: HBASE-11302
> URL: https://issues.apache.org/jira/browse/HBASE-11302
> Project: HBase
> Issue Type: Bug
> Components: Replication
> Affects Versions: 0.99.0, 0.98.2
> Reporter: Qianxi Zhang
> Assignee: Qianxi Zhang
> Fix For: 0.99.0
>
> Attachments: HBase-11302-0.99.diff
>
>
> In ReplicationSourceManager, sources is used to record the peers. It could be
> removed in removePeer method, and read in preLogRoll method. it is not thread
> safe.
> ReplicationSourceManager#296
> {code}
> void preLogRoll(Path newLog) throws IOException {
> synchronized (this.hlogsById) {
> String name = newLog.getName();
> for (ReplicationSourceInterface source : this.sources) {
> try {
> this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
> } catch (ReplicationException e) {
> throw new IOException("Cannot add log to replication queue with id="
> + source.getPeerClusterZnode() + ", filename=" + name, e);
> }
> }
> for (SortedSet<String> hlogs : this.hlogsById.values()) {
> if (this.sources.isEmpty()) {
> // If there's no slaves, don't need to keep the old hlogs since
> // we only consider the last one when a new slave comes in
> hlogs.clear();
> }
> hlogs.add(name);
> }
> }
> this.latestPath = newLog;
> }
> {code}
> ReplicationSourceManager#392
> {code}
> public void removePeer(String id) {
> LOG.info("Closing the following queue " + id + ", currently have "
> + sources.size() + " and another "
> + oldsources.size() + " that were recovered");
> String terminateMessage = "Replication stream was removed by a user";
> ReplicationSourceInterface srcToRemove = null;
> List<ReplicationSourceInterface> oldSourcesToDelete =
> new ArrayList<ReplicationSourceInterface>();
> // First close all the recovered sources for this peer
> for (ReplicationSourceInterface src : oldsources) {
> if (id.equals(src.getPeerClusterId())) {
> oldSourcesToDelete.add(src);
> }
> }
> for (ReplicationSourceInterface src : oldSourcesToDelete) {
> src.terminate(terminateMessage);
> closeRecoveredQueue((src));
> }
> LOG.info("Number of deleted recovered sources for " + id + ": "
> + oldSourcesToDelete.size());
> // Now look for the one on this cluster
> for (ReplicationSourceInterface src : this.sources) {
> if (id.equals(src.getPeerClusterId())) {
> srcToRemove = src;
> break;
> }
> }
> if (srcToRemove == null) {
> LOG.error("The queue we wanted to close is missing " + id);
> return;
> }
> srcToRemove.terminate(terminateMessage);
> this.sources.remove(srcToRemove);
> deleteSource(id, true);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)