This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 62e3409 HBASE-23683 Make HBaseInterClusterReplicationEndpoint more
extensible… (#1047)
62e3409 is described below
commit 62e340901fa60afeb164a1ff22e6092483b0ac48
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Jan 22 09:19:14 2020 +0000
HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible…
(#1047)
Signed-off-by: Bharath Vissapragada <[email protected]>
Signed-off-by: binlijin <[email protected]>
---
.../HBaseInterClusterReplicationEndpoint.java | 29 ++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index db54338..83918c9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -64,8 +64,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
/**
@@ -114,6 +116,25 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean isSerial = false;
+ /*
+ * Some implementations of HBaseInterClusterReplicationEndpoint may require
instantiating
+ * different Connection implementations, or initialize it in a different way,
+ * so defining createConnection as protected for possible overridings.
+ */
+ protected Connection createConnection(Configuration conf) throws IOException
{
+ return ConnectionFactory.createConnection(conf);
+ }
+
+ /*
+ * Some implementations of HBaseInterClusterReplicationEndpoint may require
instantiating
+ * different ReplicationSinkManager implementations, or initialize it in a
different way,
+ * so defining createReplicationSinkManager as protected for possible
overridings.
+ */
+ protected ReplicationSinkManager createReplicationSinkManager(Connection
conn) {
+ return new ReplicationSinkManager((ClusterConnection) conn,
this.ctx.getPeerId(),
+ this, this.conf);
+ }
+
@Override
public void init(Context context) throws IOException {
super.init(context);
@@ -133,12 +154,16 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
// TODO: This connection is replication specific or we should make it
particular to
// replication and make replication specific settings such as compression
or codec to use
// passing Cells.
- this.conn = (ClusterConnection)
ConnectionFactory.createConnection(this.conf);
+ Connection connection = createConnection(this.conf);
+ //Since createConnection method may be overridden by extending classes, we
need to make sure
+ //it's indeed returning a ClusterConnection instance.
+ Preconditions.checkState(connection instanceof ClusterConnection);
+ this.conn = (ClusterConnection) connection;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
- this.replicationSinkMgr = new ReplicationSinkManager(conn,
ctx.getPeerId(), this, this.conf);
+ this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads =
this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);