This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new cb78b10  HBASE-23683 Make HBaseInterClusterReplicationEndpoint more 
extensible (#1027)
cb78b10 is described below

commit cb78b103a7a24ea49e6bc7716d31a9e8bf15ea3a
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Jan 15 11:48:29 2020 +0000

    HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible 
(#1027)
    
    Signed-off-by: Bharath Vissapragada <[email protected]>
    Signed-off-by: Josh Elser <[email protected]>
    Signed-off-by: binlijin <[email protected]>
---
 .../HBaseInterClusterReplicationEndpoint.java      | 24 +++++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index ccdcee1..6f1f8b3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -113,6 +113,25 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   private boolean dropOnDeletedTables;
   private boolean isSerial = false;
 
+  /*
+   * Some implementations of HBaseInterClusterReplicationEndpoint may require 
instantiate different
+   * Connection implementations, or initialize it in a different way, so 
defining createConnection
+   * as protected for possible overridings.
+   */
+  protected AsyncClusterConnection createConnection(Configuration conf) throws 
IOException {
+    return ClusterConnectionFactory.createAsyncClusterConnection(conf,
+      null, User.getCurrent());
+  }
+
+  /*
+   * Some implementations of HBaseInterClusterReplicationEndpoint may require 
instantiate different
+   * ReplicationSinkManager implementations, or initialize it in a different 
way,
+   * so defining createReplicationSinkManager as protected for possible 
overridings.
+   */
+  protected ReplicationSinkManager 
createReplicationSinkManager(AsyncClusterConnection conn) {
+    return new ReplicationSinkManager(conn, this, this.conf);
+  }
+
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
@@ -131,13 +150,12 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
     // TODO: This connection is replication specific or we should make it 
particular to
     // replication and make replication specific settings such as compression 
or codec to use
     // passing Cells.
-    this.conn =
-      ClusterConnectionFactory.createAsyncClusterConnection(conf, null, 
User.getCurrent());
+    this.conn = createConnection(this.conf);
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // ReplicationQueueInfo parses the peerId out of the znode for us
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, this, 
this.conf);
+    this.replicationSinkMgr = createReplicationSinkManager(conn);
     // per sink thread pool
     this.maxThreads = 
this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);

Reply via email to