Repository: hbase Updated Branches: refs/heads/master d12eb7a4a -> 6e7baa07f
HBASE-15982 Interface ReplicationEndpoint extends Guava's Service Breaking change to our ReplicationEndpoint and BaseReplicationEndpoint. ReplicationEndpoint implemented Guava 0.12 Service. An abstract subclass, BaseReplicationEndpoint, provided default implementations and facility, among other things, by extending Guava AbstractService class. Both of these HBase classes were marked LimitedPrivate for REPLICATION so these classes were semi-public and made it so Guava 0.12 was part of our API. Having Guava in our API was a mistake. It anchors us and the implementation of the Interface to Guava 0.12. This is untenable given Guava changes and that the Service Interface in particular has had extensive revamp and improvement done. We can't hold to the Guava Interface. It changed. We can't stay on Guava 0.12; implementors and others on our CLASSPATH won't abide being stuck on an old Guava. So this class makes breaking changes. The unhitching of our Interface from Guava could only be done in a breaking manner. It undoes the LimitedPrivate on BaseReplicationEndpoint while keeping it for the RE Interface. It means consumers will have to copy/paste the AbstractService-based BRE into their own codebase also supplying their own Guava; HBase no longer 'supplies' this (our Guava usage has been internalized, relocated). This patch then adds into RE the basic methods RE needs of the old Guava Service rather than return a Service to start/stop only to go back to the RE instance to do actual work. A few method names had to be changed so could make implementations with Guava Service internally and not have RE method names and types clash). Semantics remained the same otherwise. For example startAsync and stopAsync in Guava are start and stop in RE. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e7baa07 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e7baa07 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e7baa07 Branch: refs/heads/master Commit: 6e7baa07f0b1f5841379545acaf23d36f50de2c2 Parents: d12eb7a Author: Michael Stack <st...@apache.org> Authored: Tue Aug 8 21:55:47 2017 +0800 Committer: Michael Stack <st...@apache.org> Committed: Thu Aug 24 08:05:27 2017 -0700 ---------------------------------------------------------------------- .../replication/BaseReplicationEndpoint.java | 16 ++-- .../replication/HBaseReplicationEndpoint.java | 10 +++ .../hbase/replication/ReplicationEndpoint.java | 88 +++++++++++++++++++- .../regionserver/ReplicationSource.java | 38 ++++----- .../VisibilityReplicationEndpoint.java | 40 ++++----- .../TestReplicationAdminWithClusters.java | 10 +++ .../replication/TestReplicationEndpoint.java | 10 +++ .../replication/TestReplicationSource.java | 2 +- 8 files changed, 160 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index ae4e7cc..5b9cef7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -24,15 +24,16 @@ import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService; + /** - * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this - * class rather than implementing {@link ReplicationEndpoint} directly for better backwards - * compatibility. + * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal + * Guava. */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +// This class has been made InterfaceAudience.Private in 2.0.0. It used to be +// LimitedPrivate. See HBASE-15982. +@InterfaceAudience.Private public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { @@ -109,4 +110,9 @@ public abstract class BaseReplicationEndpoint extends AbstractService public boolean canReplicateToSameCluster() { return false; } + + @Override + public boolean isStarting() { + return state() == State.STARTING; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 1bc18a9..42667e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -79,6 +79,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override protected void doStart() { try { reloadZkWatcher(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 6bf696b..f23276c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -31,8 +33,6 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; - /** * ReplicationEndpoint is a plugin which implements replication * to other HBase clusters, or other systems. ReplicationEndpoint implementation @@ -47,7 +47,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; * and persisting of the WAL entries in the other cluster. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener { +public interface ReplicationEndpoint extends ReplicationPeerConfigListener { + // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop. + // How they relate? Do we #start before #init(Context)? We fail fast if you don't? @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { @@ -176,4 +178,82 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe * parameters can be obtained. */ boolean replicate(ReplicateContext replicateContext); -} + + + // The below methods are inspired by Guava Service. See + // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. + // Below we implement a subset only with different names on some methods so we can implement + // the below internally using Guava (without exposing our implementation to + // ReplicationEndpoint implementors. + + /** + * Returns {@code true} if this service is RUNNING. + */ + boolean isRunning(); + + /** + * @return Return {@code true} is this service is STARTING (but not yet RUNNING). + */ + boolean isStarting(); + + /** + * Initiates service startup and returns immediately. A stopped service may not be restarted. + * Equivalent of startAsync call in Guava Service. + * @throws IllegalStateException if the service is not new, if it has been run already. + */ + void start(); + + /** + * Waits for the {@link ReplicationEndpoint} to be up and running. + * + * @throws IllegalStateException if the service reaches a state from which it is not possible to + * enter the (internal) running state. e.g. if the state is terminated when this method is + * called then this will throw an IllegalStateException. + */ + void awaitRunning(); + + /** + * Waits for the {@link ReplicationEndpoint} to to be up and running for no more + * than the given time. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws TimeoutException if the service has not reached the given state within the deadline + * @throws IllegalStateException if the service reaches a state from which it is not possible to + * enter the (internal) running state. e.g. if the state is terminated when this method is + * called then this will throw an IllegalStateException. + */ + void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException; + + /** + * If the service is starting or running, this initiates service shutdown and returns immediately. + * If the service has already been stopped, this method returns immediately without taking action. + * Equivalent of stopAsync call in Guava Service. + */ + void stop(); + + /** + * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state. + * + * @throws IllegalStateException if the service FAILED. + */ + void awaitTerminated(); + + /** + * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no + * more than the given time. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws TimeoutException if the service has not reached the given state within the deadline + * @throws IllegalStateException if the service FAILED. + */ + void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException; + + /** + * Returns the {@link Throwable} that caused this service to fail. + * + * @throws IllegalStateException if this service's state isn't FAILED. + */ + Throwable failureCause(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1d3e4fb..f3a37dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; import java.io.IOException; import java.util.ArrayList; @@ -130,6 +128,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private AtomicLong totalBufferUsed; + public static final String WAIT_ON_ENDPOINT_SECONDS = + "hbase.replication.wait.on.endpoint.seconds"; + public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; + private int waitOnEndpointSeconds = -1; + /** * Instantiation method used by region servers * @@ -152,6 +155,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); + this.waitOnEndpointSeconds = + this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second @@ -245,17 +250,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.sourceRunning = true; try { // start the endpoint, connect to the cluster - Service service = replicationEndpoint.startAsync(); - final int waitTime = 10; - service.awaitRunning(waitTime, TimeUnit.SECONDS); - if (!service.isRunning()) { - LOG.warn("ReplicationEndpoint was not started after waiting " + waitTime + - " + seconds. Exiting"); - uninitialize(); - return; - } + this.replicationEndpoint.start(); + this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS); } catch (Exception ex) { LOG.warn("Error starting ReplicationEndpoint, exiting", ex); + uninitialize(); throw new RuntimeException(ex); } @@ -383,14 +382,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private void uninitialize() { LOG.debug("Source exiting " + this.peerId); metrics.clear(); - if (replicationEndpoint.state() == Service.State.STARTING - || replicationEndpoint.state() == Service.State.RUNNING) { - replicationEndpoint.stopAsync(); - final int waitTime = 10; + if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) { + this.replicationEndpoint.stop(); try { - replicationEndpoint.awaitTerminated(waitTime, TimeUnit.SECONDS); + this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS); } catch (TimeoutException e) { - LOG.warn("Failed termination after " + waitTime + " seconds."); + LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds."); } } } @@ -463,18 +460,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf worker.entryReader.interrupt(); worker.interrupt(); } - Service service = null; if (this.replicationEndpoint != null) { - service = this.replicationEndpoint.stopAsync(); + this.replicationEndpoint.stop(); } if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated"); } - if (service != null) { + if (this.replicationEndpoint != null) { try { - service.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); + this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + this.peerClusterZnode, http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index 51655a1..1ce2b3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -46,8 +46,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service; public class VisibilityReplicationEndpoint implements ReplicationEndpoint { private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class); - private ReplicationEndpoint delegator; - private VisibilityLabelService visibilityLabelsService; + + private final ReplicationEndpoint delegator; + private final VisibilityLabelService visibilityLabelsService; public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, VisibilityLabelService visibilityLabelsService) { @@ -62,7 +63,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { @Override public void peerConfigUpdated(ReplicationPeerConfig rpc){ - + delegator.peerConfigUpdated(rpc); } @Override @@ -138,23 +139,16 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override - public Service startAsync() { - return this.delegator.startAsync(); - } - - @Override public boolean isRunning() { - return delegator.isRunning(); + return this.delegator.isRunning(); } @Override - public State state() { - return delegator.state(); - } + public boolean isStarting() {return this.delegator.isStarting();} @Override - public Service stopAsync() { - return this.delegator.stopAsync(); + public void start() { + this.delegator.start(); } @Override @@ -163,8 +157,13 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override - public void awaitRunning(long l, TimeUnit timeUnit) throws TimeoutException { - this.delegator.awaitRunning(l, timeUnit); + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + this.delegator.awaitRunning(timeout, unit); + } + + @Override + public void stop() { + this.delegator.stop(); } @Override @@ -173,17 +172,12 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { } @Override - public void awaitTerminated(long l, TimeUnit timeUnit) throws TimeoutException { - this.delegator.awaitTerminated(l, timeUnit); + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + this.delegator.awaitTerminated(timeout, unit); } @Override public Throwable failureCause() { return this.delegator.failureCause(); } - - @Override - public void addListener(Listener listener, Executor executor) { - this.delegator.addListener(listener, executor); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 3b5522b..2610313 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -250,6 +250,16 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { } @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override protected void doStart() { notifyStarted(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index c63a69b..a0562bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -396,6 +396,16 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override protected void doStart() { startedCount.incrementAndGet(); notifyStarted(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 7ea698c..c3b7eaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -158,7 +158,7 @@ public class TestReplicationSource { // completes } }; - replicationEndpoint.startAsync(); + replicationEndpoint.start(); ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);