saintstack commented on a change in pull request #2413:
URL: https://github.com/apache/hbase/pull/2413#discussion_r490419898



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -320,53 +336,89 @@ public void removePeer(String peerId) {
   }
 
   /**
-   * Factory method to create a replication source
-   * @param queueId the id of the replication queue
+   * Factory method to create and initialize a replication source
+   * @param peerId the id of the replication queue
    * @return the created source
    */
-  private ReplicationSourceInterface createSource(String queueId, 
ReplicationPeer replicationPeer)
+  private ReplicationSourceInterface createSource(String peerId, 
ReplicationPeer replicationPeer)
       throws IOException {
-    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
+    ReplicationSourceInterface rs = ReplicationSourceFactory.create(conf, 
peerId, this.walFactory);
+    rs.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, 
clusterId,
+      new MetricsSource(peerId));
+    return addWALActionsListener(rs);
+  }
 
-    MetricsSource metrics = new MetricsSource(queueId);
-    // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, 
clusterId,
-      walFileLengthProvider, metrics);
-    return src;
+  /**
+   * Add a listener on the WALProvider used by the passed ReplicationSource. 
Only add the listener
+   * ONCE. addWALActionsListener is not idempotent and it would be tough to 
make it so; hence a bit
+   * of gymnastics are needed here to ensure we register the listener once 
only per walProvider.
+   * @return Returns the <code>rs</code> passed as a parameter.
+   */
+  private ReplicationSourceInterface 
addWALActionsListener(ReplicationSourceInterface rs) {
+    WALProvider walProvider = rs.getWALProvider();
+    if (walProvider != null) {
+      synchronized (this.walProvidersWithListenersInstalled) {
+        if (!this.walProvidersWithListenersInstalled.contains(walProvider)) {
+          walProvider.addWALActionsListener(new 
ReplicationSourceWALActionListener(conf, this));
+          this.walProvidersWithListenersInstalled.add(walProvider);
+        }
+      }
+    }
+    return rs;
   }
 
+
   /**
-   * Add a normal source for the given peer on this region server. Meanwhile, 
add new replication
-   * queue to storage. For the newly added peer, we only need to enqueue the 
latest log of each wal
-   * group and do replication
+   * Add a replication source for the given peer on this region server. Peer 
should have been
+   * added to {@link #replicationPeers} before calling this method because we 
expect to find
+   * the peerIds {@link ReplicationPeer} in {@link #replicationPeers}.
    * @param peerId the id of the replication peer
-   * @return the source that was created
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String peerId) throws IOException {
-    ReplicationPeer peer = replicationPeers.getPeer(peerId);
-    ReplicationSourceInterface src = createSource(peerId, peer);
-    // synchronized on latestPaths to avoid missing the new log
+    return addSource(replicationPeers.getPeer(peerId));
+  }
+
+  /**
+   * Adds a replication source. Creates source if not present otherwise 
fetches current source
+   * from {@link #sources}.
+   * @see #addSource(String) for method to add peer AND persist to replication 
store.
+   */
+  private ReplicationSourceInterface addSource(ReplicationPeer peer) throws 
IOException {
+    String id = peer.getId();
+    ReplicationSourceInterface src;
+    // Synchronized on latestPaths to avoid missing the new WAL.
     synchronized (this.latestPaths) {
-      this.sources.put(peerId, src);
+      src = this.sources.get(id);

Review comment:
       Better hygiene. Explicit handling of clash on peerId rather than just 
silent overwrite of an existing source. Let me know if you want me restore the 
old.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to