This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new cb78b10 HBASE-23683 Make HBaseInterClusterReplicationEndpoint more
extensible (#1027)
cb78b10 is described below
commit cb78b103a7a24ea49e6bc7716d31a9e8bf15ea3a
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Jan 15 11:48:29 2020 +0000
HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible
(#1027)
Signed-off-by: Bharath Vissapragada <[email protected]>
Signed-off-by: Josh Elser <[email protected]>
Signed-off-by: binlijin <[email protected]>
---
.../HBaseInterClusterReplicationEndpoint.java | 24 +++++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index ccdcee1..6f1f8b3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -113,6 +113,25 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean isSerial = false;
+ /*
+ * Some implementations of HBaseInterClusterReplicationEndpoint may require
instantiate different
+ * Connection implementations, or initialize it in a different way, so
defining createConnection
+ * as protected for possible overridings.
+ */
+ protected AsyncClusterConnection createConnection(Configuration conf) throws
IOException {
+ return ClusterConnectionFactory.createAsyncClusterConnection(conf,
+ null, User.getCurrent());
+ }
+
+ /*
+ * Some implementations of HBaseInterClusterReplicationEndpoint may require
instantiate different
+ * ReplicationSinkManager implementations, or initialize it in a different
way,
+ * so defining createReplicationSinkManager as protected for possible
overridings.
+ */
+ protected ReplicationSinkManager
createReplicationSinkManager(AsyncClusterConnection conn) {
+ return new ReplicationSinkManager(conn, this, this.conf);
+ }
+
@Override
public void init(Context context) throws IOException {
super.init(context);
@@ -131,13 +150,12 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
// TODO: This connection is replication specific or we should make it
particular to
// replication and make replication specific settings such as compression
or codec to use
// passing Cells.
- this.conn =
- ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
User.getCurrent());
+ this.conn = createConnection(this.conf);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
- this.replicationSinkMgr = new ReplicationSinkManager(conn, this,
this.conf);
+ this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads =
this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);