Repository: hbase
Updated Branches:
  refs/heads/branch-1 ae6597542 -> 8a7920ecd


HBASE-11920 Add CP hooks for ReplicationEndPoint


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8a7920ec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8a7920ec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8a7920ec

Branch: refs/heads/branch-1
Commit: 8a7920ecdbe4b70878a363355818f4803ba88c22
Parents: ae65975
Author: Ramkrishna <[email protected]>
Authored: Sun Sep 28 15:58:57 2014 +0530
Committer: Ramkrishna <[email protected]>
Committed: Sun Sep 28 15:58:57 2014 +0530

----------------------------------------------------------------------
 .../coprocessor/BaseRegionServerObserver.java   |  7 ++++
 .../hbase/coprocessor/RegionServerObserver.java | 10 +++++
 .../RegionServerCoprocessorHost.java            | 34 +++++++++++++++++
 .../regionserver/ReplicationSourceManager.java  | 40 +++++++++++++-------
 .../hbase/security/access/AccessController.java | 11 +++++-
 5 files changed, 87 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8a7920ec/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
index 5bc23d3..c21cdf8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 
 /**
  * An abstract class that implements RegionServerObserver.
@@ -76,4 +77,10 @@ public class BaseRegionServerObserver implements 
RegionServerObserver {
   public void 
postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> 
ctx)
       throws IOException { }
 
+  @Override
+  public ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, 
ReplicationEndpoint endpoint) {
+    return endpoint;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a7920ec/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
index 8a76d46..5c07fd2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 
 public interface RegionServerObserver extends Coprocessor {
 
@@ -121,4 +122,13 @@ public interface RegionServerObserver extends Coprocessor {
   void postRollWALWriterRequest(final 
ObserverContext<RegionServerCoprocessorEnvironment> ctx)
       throws IOException;
 
+  /**
+   * This will be called after the replication endpoint is instantiated.
+   * @param ctx
+   * @param endpoint - the base endpoint for replication
+   * @return the endpoint to use during replication.
+   */
+  ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, 
ReplicationEndpoint endpoint);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a7920ec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
index 54552c6..ec44560 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
@@ -156,6 +157,27 @@ public class RegionServerCoprocessorHost extends
     });
   }
 
+  public ReplicationEndpoint postCreateReplicationEndPoint(final 
ReplicationEndpoint endpoint)
+      throws IOException {
+    return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
+        : new CoprocessOperationWithResult<ReplicationEndpoint>() {
+          @Override
+          public void call(RegionServerObserver oserver,
+              ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws 
IOException {
+            setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
+          }
+        });
+  }
+
+  private <T> T execOperationWithResult(final T defaultValue,
+      final CoprocessOperationWithResult<T> ctx) throws IOException {
+    if (ctx == null)
+      return defaultValue;
+    ctx.setResult(defaultValue);
+    execOperation(ctx);
+    return ctx.getResult();
+  }
+
   private static abstract class CoprocessorOperation
       extends ObserverContext<RegionServerCoprocessorEnvironment> {
     public CoprocessorOperation() {
@@ -168,6 +190,18 @@ public class RegionServerCoprocessorHost extends
     }
   }
 
+  private static abstract class CoprocessOperationWithResult<T> extends 
CoprocessorOperation {
+    private T result = null;
+
+    public void setResult(final T result) {
+      this.result = result;
+    }
+
+    public T getResult() {
+      return this.result;
+    }
+  }
+
   private boolean execOperation(final CoprocessorOperation ctx) throws 
IOException {
     if (ctx == null) return false;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a7920ec/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a2f1667..cb0f6ce 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -39,11 +39,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
@@ -84,7 +86,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   // UUID for this cluster
   private final UUID clusterId;
   // All about stopping
-  private final Stoppable stopper;
+  private final Server server;
   // All logs we are currently tracking
   private final Map<String, SortedSet<String>> hlogsById;
   // Logs for recovered sources we are currently tracking
@@ -111,7 +113,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param replicationPeers
    * @param replicationTracker
    * @param conf the configuration to use
-   * @param stopper the stopper object for this region server
+   * @param server the server for this region server
    * @param fs the file system to use
    * @param logDir the directory that contains all hlog directories of live RSs
    * @param oldLogDir the directory where old logs are archived
@@ -119,7 +121,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker 
replicationTracker,
-      final Configuration conf, final Stoppable stopper, final FileSystem fs, 
final Path logDir,
+      final Configuration conf, final Server server, final FileSystem fs, 
final Path logDir,
       final Path oldLogDir, final UUID clusterId) {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
@@ -127,7 +129,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
-    this.stopper = stopper;
+    this.server = server;
     this.hlogsById = new HashMap<String, SortedSet<String>>();
     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, 
SortedSet<String>>();
     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -243,7 +245,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
-          this.replicationPeers, stopper, id, this.clusterId, peerConfig, 
peer);
+          this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet<String>());
@@ -257,7 +259,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           String message =
               "Cannot add log to queue when creating a new source, queueId="
                   + src.getPeerClusterZnode() + ", filename=" + name;
-          stopper.stop(message);
+          server.stop(message);
           throw e;
         }
         src.enqueueLog(this.latestPath);
@@ -359,7 +361,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param conf the configuration to use
    * @param fs the file system to use
    * @param manager the manager to use
-   * @param stopper the stopper object for this region server
+   * @param server the server object for this region server
    * @param peerId the id of the peer cluster
    * @return the created source
    * @throws IOException
@@ -367,9 +369,13 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   protected ReplicationSourceInterface getReplicationSource(final 
Configuration conf,
       final FileSystem fs, final ReplicationSourceManager manager,
       final ReplicationQueues replicationQueues, final ReplicationPeers 
replicationPeers,
-      final Stoppable stopper, final String peerId, final UUID clusterId,
+      final Server server, final String peerId, final UUID clusterId,
       final ReplicationPeerConfig peerConfig, final ReplicationPeer 
replicationPeer)
           throws IOException {
+    RegionServerCoprocessorHost rsServerHost = null;
+    if (server instanceof HRegionServer) {
+      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
+    }
     ReplicationSourceInterface src;
     try {
       @SuppressWarnings("rawtypes")
@@ -392,6 +398,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       @SuppressWarnings("rawtypes")
       Class c = Class.forName(replicationEndpointImpl);
       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+      if(rsServerHost != null) {
+        ReplicationEndpoint newReplicationEndPoint = rsServerHost
+            .postCreateReplicationEndPoint(replicationEndpoint);
+        if(newReplicationEndPoint != null) {
+          // Override the newly created endpoint from the hook with configured 
end point
+          replicationEndpoint = newReplicationEndPoint;
+        }
+      }
     } catch (Exception e) {
       LOG.warn("Passed replication endpoint implementation throws errors", e);
       throw new IOException(e);
@@ -399,7 +413,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     MetricsSource metrics = new MetricsSource(peerId);
     // init replication source
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, 
peerId,
+    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, 
peerId,
       clusterId, replicationEndpoint, metrics);
 
     // init replication endpoint
@@ -542,7 +556,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         Thread.currentThread().interrupt();
       }
       // We try to lock that rs' queue directory
-      if (stopper.isStopped()) {
+      if (server.isStopped()) {
         LOG.info("Not transferring queue since we are shutting down");
         return;
       }
@@ -578,7 +592,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, 
this.rq, this.rp,
-                stopper, peerId, this.clusterId, peerConfig, peer);
+                server, peerId, this.clusterId, peerConfig, peer);
           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current 
peer");
             break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a7920ec/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index f1c3cd6..975651e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -30,7 +30,7 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -45,13 +45,13 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -2210,4 +2211,10 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void 
postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> 
ctx)
       throws IOException { }
+  
+  @Override
+  public ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, 
ReplicationEndpoint endpoint) {
+    return endpoint;
+  }
 }

Reply via email to