Repository: hbase Updated Branches: refs/heads/master a2e05b9f8 -> 44a27c5cd
HBASE-11920 Add CP hooks for ReplicationEndPoint Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44a27c5c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44a27c5c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44a27c5c Branch: refs/heads/master Commit: 44a27c5cd76f44e435671c69d1d8f60c42a2b420 Parents: a2e05b9 Author: Ramkrishna <[email protected]> Authored: Thu Sep 25 22:11:28 2014 +0530 Committer: Ramkrishna <[email protected]> Committed: Thu Sep 25 22:11:28 2014 +0530 ---------------------------------------------------------------------- .../BaseMasterAndRegionObserver.java | 14 +++---- .../coprocessor/BaseRegionServerObserver.java | 7 ++++ .../hbase/coprocessor/RegionServerObserver.java | 10 +++++ .../RegionServerCoprocessorHost.java | 34 +++++++++++++++++ .../regionserver/ReplicationSourceManager.java | 40 +++++++++++++------- .../hbase/security/access/AccessController.java | 11 +++++- 6 files changed, 94 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java index 768481d..a6b9d84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java @@ -19,23 +19,23 @@ package org.apache.hadoop.hbase.coprocessor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; -import java.io.IOException; -import java.util.List; - @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public abstract class BaseMasterAndRegionObserver extends BaseRegionObserver http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java index 5bc23d3..c21cdf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; /** * An abstract class that implements RegionServerObserver. @@ -76,4 +77,10 @@ public class BaseRegionServerObserver implements RegionServerObserver { public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { } + @Override + public ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { + return endpoint; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index 8a76d46..5c07fd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; public interface RegionServerObserver extends Coprocessor { @@ -121,4 +122,13 @@ public interface RegionServerObserver extends Coprocessor { void postRollWALWriterRequest(final ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException; + /** + * This will be called after the replication endpoint is instantiated. + * @param ctx + * @param endpoint - the base endpoint for replication + * @return the endpoint to use during replication. + */ + ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 54552c6..ec44560 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving @@ -156,6 +157,27 @@ public class RegionServerCoprocessorHost extends }); } + public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) + throws IOException { + return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null + : new CoprocessOperationWithResult<ReplicationEndpoint>() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + setResult(oserver.postCreateReplicationEndPoint(ctx, getResult())); + } + }); + } + + private <T> T execOperationWithResult(final T defaultValue, + final CoprocessOperationWithResult<T> ctx) throws IOException { + if (ctx == null) + return defaultValue; + ctx.setResult(defaultValue); + execOperation(ctx); + return ctx.getResult(); + } + private static abstract class CoprocessorOperation extends ObserverContext<RegionServerCoprocessorEnvironment> { public CoprocessorOperation() { @@ -168,6 +190,18 @@ public class RegionServerCoprocessorHost extends } } + private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation { + private T result = null; + + public void setResult(final T result) { + this.result = result; + } + + public T getResult() { + return this.result; + } + } + private boolean execOperation(final CoprocessorOperation ctx) throws IOException { if (ctx == null) return false; http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a2f1667..cb0f6ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -39,11 +39,13 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; @@ -84,7 +86,7 @@ public class ReplicationSourceManager implements ReplicationListener { // UUID for this cluster private final UUID clusterId; // All about stopping - private final Stoppable stopper; + private final Server server; // All logs we are currently tracking private final Map<String, SortedSet<String>> hlogsById; // Logs for recovered sources we are currently tracking @@ -111,7 +113,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param replicationPeers * @param replicationTracker * @param conf the configuration to use - * @param stopper the stopper object for this region server + * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived @@ -119,7 +121,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, - final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, + final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. @@ -127,7 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; - this.stopper = stopper; + this.server = server; this.hlogsById = new HashMap<String, SortedSet<String>>(); this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>(); this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); @@ -243,7 +245,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer); + this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet<String>()); @@ -257,7 +259,7 @@ public class ReplicationSourceManager implements ReplicationListener { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; - stopper.stop(message); + server.stop(message); throw e; } src.enqueueLog(this.latestPath); @@ -359,7 +361,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use - * @param stopper the stopper object for this region server + * @param server the server object for this region server * @param peerId the id of the peer cluster * @return the created source * @throws IOException @@ -367,9 +369,13 @@ public class ReplicationSourceManager implements ReplicationListener { protected ReplicationSourceInterface getReplicationSource(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, - final Stoppable stopper, final String peerId, final UUID clusterId, + final Server server, final String peerId, final UUID clusterId, final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) throws IOException { + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + } ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -392,6 +398,14 @@ public class ReplicationSourceManager implements ReplicationListener { @SuppressWarnings("rawtypes") Class c = Class.forName(replicationEndpointImpl); replicationEndpoint = (ReplicationEndpoint) c.newInstance(); + if(rsServerHost != null) { + ReplicationEndpoint newReplicationEndPoint = rsServerHost + .postCreateReplicationEndPoint(replicationEndpoint); + if(newReplicationEndPoint != null) { + // Override the newly created endpoint from the hook with configured end point + replicationEndpoint = newReplicationEndPoint; + } + } } catch (Exception e) { LOG.warn("Passed replication endpoint implementation throws errors", e); throw new IOException(e); @@ -399,7 +413,7 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, + src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, replicationEndpoint, metrics); // init replication endpoint @@ -542,7 +556,7 @@ public class ReplicationSourceManager implements ReplicationListener { Thread.currentThread().interrupt(); } // We try to lock that rs' queue directory - if (stopper.isStopped()) { + if (server.isStopped()) { LOG.info("Not transferring queue since we are shutting down"); return; } @@ -578,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - stopper, peerId, this.clusterId, peerConfig, peer); + server, peerId, this.clusterId, peerConfig, peer); if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; http://git-wip-us.apache.org/repos/asf/hbase/blob/44a27c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 95cd72a..96c912a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -30,7 +30,7 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -45,13 +45,13 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2252,4 +2253,10 @@ public class AccessController extends BaseMasterAndRegionObserver final String namespace, final Quotas quotas) throws IOException { requirePermission("setNamespaceQuota", Action.ADMIN); } + + @Override + public ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) { + return endpoint; + } }
