This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9198818  Make replication services start when configured (#868)
9198818 is described below

commit 91988183d0649e9fd09901e2c5905e9f7a678368
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Mon Jan 7 16:53:44 2019 -0500

    Make replication services start when configured (#868)
    
    * Replication services in tserver and master will now only start when 
replication.name is set
---
 .../java/org/apache/accumulo/master/Master.java    | 103 +++++++++++++--------
 .../org/apache/accumulo/tserver/TabletServer.java  |  66 +++++++------
 .../replication/MultiTserverReplicationIT.java     |   3 +
 3 files changed, 102 insertions(+), 70 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 1bffe6b..cc1d800 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,6 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -1261,22 +1263,6 @@ public class Master
     clientService = sa.server;
     log.info("Started Master client service at {}", sa.address);
 
-    // Start the replication coordinator which assigns tservers to service 
replication requests
-    MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
-    ReplicationCoordinator.Iface haReplicationProxy = 
HighlyAvailableServiceWrapper.service(impl,
-        this);
-    // @formatter:off
-    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> 
replicationCoordinatorProcessor =
-      new 
ReplicationCoordinator.Processor<>(TraceWrap.service(haReplicationProxy));
-    // @formatter:on
-    ServerAddress replAddress = TServerUtils.startServer(context, hostname,
-        Property.MASTER_REPLICATION_COORDINATOR_PORT, 
replicationCoordinatorProcessor,
-        "Master Replication Coordinator", "Replication Coordinator", null,
-        Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
-        Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
-
-    log.info("Started replication coordinator service at " + 
replAddress.address);
-
     // block until we can obtain the ZK lock for the master
     getMasterLock(zroot + Constants.ZMASTER_LOCK);
 
@@ -1391,27 +1377,20 @@ public class Master
       sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     }
 
-    // Start the daemon to scan the replication table and make units of work
-    replicationWorkDriver = new ReplicationDriver(this);
-    replicationWorkDriver.start();
-
-    // Start the daemon to assign work to tservers to replicate to our peers
-    replicationWorkAssigner = new WorkDriver(this);
-    replicationWorkAssigner.start();
-
-    // Advertise that port we used so peers don't have to be told what it is
-    context.getZooReaderWriter().putPersistentData(
-        getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
-        replAddress.address.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
-
-    // Register replication metrics
-    MasterMetricsFactory factory = new 
MasterMetricsFactory(getConfiguration(), this);
-    Metrics replicationMetrics = factory.createReplicationMetrics();
-    try {
-      replicationMetrics.register();
-    } catch (Exception e) {
-      log.error("Failed to register replication metrics", e);
-    }
+    // if the replication name is ever set, then start replication services
+    final AtomicReference<TServer> replServer = new AtomicReference<>();
+    SimpleTimer.getInstance(getConfiguration()).schedule(() -> {
+      try {
+        if (replServer.get() == null) {
+          if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
+            log.info(Property.REPLICATION_NAME.getKey() + " was set, starting 
repl services.");
+            replServer.set(setupReplication());
+          }
+        }
+      } catch (UnknownHostException | KeeperException | InterruptedException 
e) {
+        log.error("Error occurred starting replication services. ", e);
+      }
+    }, 0, 5000);
 
     // The master is fully initialized. Clients are allowed to connect now.
     masterInitialized.set(true);
@@ -1427,9 +1406,12 @@ public class Master
 
     final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
     statusThread.join(remaining(deadline));
-    replicationWorkAssigner.join(remaining(deadline));
-    replicationWorkDriver.join(remaining(deadline));
-    replAddress.server.stop();
+    if (replicationWorkAssigner != null)
+      replicationWorkAssigner.join(remaining(deadline));
+    if (replicationWorkDriver != null)
+      replicationWorkDriver.join(remaining(deadline));
+    TServerUtils.stopTServer(replServer.get());
+
     // Signal that we want it to stop, and wait for it to do so.
     if (authenticationTokenKeyManager != null) {
       authenticationTokenKeyManager.gracefulStop();
@@ -1444,6 +1426,47 @@ public class Master
     log.info("exiting");
   }
 
+  private TServer setupReplication()
+      throws UnknownHostException, KeeperException, InterruptedException {
+    // Start the replication coordinator which assigns tservers to service 
replication requests
+    MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
+    ReplicationCoordinator.Iface haReplicationProxy = 
HighlyAvailableServiceWrapper.service(impl,
+        this);
+    // @formatter:off
+    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> 
replicationCoordinatorProcessor =
+            new 
ReplicationCoordinator.Processor<>(TraceWrap.service(haReplicationProxy));
+    // @formatter:on
+    ServerAddress replAddress = TServerUtils.startServer(context, hostname,
+        Property.MASTER_REPLICATION_COORDINATOR_PORT, 
replicationCoordinatorProcessor,
+        "Master Replication Coordinator", "Replication Coordinator", null,
+        Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
+        Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
+
+    log.info("Started replication coordinator service at " + 
replAddress.address);
+    // Start the daemon to scan the replication table and make units of work
+    replicationWorkDriver = new ReplicationDriver(this);
+    replicationWorkDriver.start();
+
+    // Start the daemon to assign work to tservers to replicate to our peers
+    replicationWorkAssigner = new WorkDriver(this);
+    replicationWorkAssigner.start();
+
+    // Advertise that port we used so peers don't have to be told what it is
+    context.getZooReaderWriter().putPersistentData(
+        getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
+        replAddress.address.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
+
+    // Register replication metrics
+    MasterMetricsFactory factory = new 
MasterMetricsFactory(getConfiguration(), this);
+    Metrics replicationMetrics = factory.createReplicationMetrics();
+    try {
+      replicationMetrics.register();
+    } catch (Exception e) {
+      log.error("Failed to register replication metrics", e);
+    }
+    return replAddress.server;
+  }
+
   private long remaining(long deadline) {
     return Math.max(1, deadline - System.currentTimeMillis());
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index db69705..a7ddcac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -341,7 +341,7 @@ public class TabletServer implements Runnable {
   private ZooLock tabletServerLock;
 
   private TServer server;
-  private TServer replServer;
+  private volatile TServer replServer;
 
   private DistributedWorkQueue bulkFailedCopyQ;
 
@@ -2664,7 +2664,7 @@ public class TabletServer implements Runnable {
     return address;
   }
 
-  private HostAndPort startReplicationService() throws UnknownHostException {
+  private void startReplicationService() throws UnknownHostException {
     final ReplicationServicerHandler handler = new 
ReplicationServicerHandler(this);
     ReplicationServicer.Iface rpcProxy = TraceWrap.service(handler);
     ReplicationServicer.Iface repl = 
TCredentialsUpdatingWrapper.service(rpcProxy,
@@ -2695,8 +2695,6 @@ public class TabletServer implements Runnable {
       log.error("Could not advertise replication service port", e);
       throw new RuntimeException(e);
     }
-
-    return sp.address;
   }
 
   public ZooLock getLock() {
@@ -2841,35 +2839,16 @@ public class TabletServer implements Runnable {
       log.error("Error setting watches for recoveries");
       throw new RuntimeException(ex);
     }
-
-    // Start the thrift service listening for incoming replication requests
-    try {
-      startReplicationService();
-    } catch (UnknownHostException e) {
-      throw new RuntimeException("Failed to start replication service", e);
-    }
-
-    // Start the pool to handle outgoing replications
-    final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(
-        getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), 
"replication task");
-    replWorker.setExecutor(replicationThreadPool);
-    replWorker.run();
-
-    // Check the configuration value for the size of the pool and, if changed, 
resize the pool,
-    // every 5 seconds);
     final AccumuloConfiguration aconf = getConfiguration();
-    Runnable replicationWorkThreadPoolResizer = new Runnable() {
-      @Override
-      public void run() {
-        int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS);
-        if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) {
-          log.info("Resizing thread pool for sending replication work from {} 
to {}",
-              replicationThreadPool.getMaximumPoolSize(), maxPoolSize);
-          replicationThreadPool.setMaximumPoolSize(maxPoolSize);
+    // if the replication name is ever set, then start replication services
+    SimpleTimer.getInstance(aconf).schedule(() -> {
+      if (this.replServer == null) {
+        if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
+          log.info(Property.REPLICATION_NAME.getKey() + " was set, starting 
repl services.");
+          setupReplication(aconf);
         }
       }
-    };
-    SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 
10000, 30000);
+    }, 0, 5000);
 
     final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
     SimpleTimer.getInstance(aconf).schedule(new BulkImportCacheCleaner(this),
@@ -2954,6 +2933,7 @@ public class TabletServer implements Runnable {
     }
     log.debug("Stopping Replication Server");
     TServerUtils.stopTServer(this.replServer);
+
     log.debug("Stopping Thrift Servers");
     TServerUtils.stopTServer(server);
 
@@ -2975,6 +2955,32 @@ public class TabletServer implements Runnable {
     }
   }
 
+  private void setupReplication(AccumuloConfiguration aconf) {
+    // Start the thrift service listening for incoming replication requests
+    try {
+      startReplicationService();
+    } catch (UnknownHostException e) {
+      throw new RuntimeException("Failed to start replication service", e);
+    }
+
+    // Start the pool to handle outgoing replications
+    final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(
+        getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), 
"replication task");
+    replWorker.setExecutor(replicationThreadPool);
+    replWorker.run();
+
+    // Check the configuration value for the size of the pool and, if changed, 
resize the pool
+    Runnable replicationWorkThreadPoolResizer = () -> {
+      int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS);
+      if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) {
+        log.info("Resizing thread pool for sending replication work from {} to 
{}",
+            replicationThreadPool.getMaximumPoolSize(), maxPoolSize);
+        replicationThreadPool.setMaximumPoolSize(maxPoolSize);
+      }
+    };
+    SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 
10000, 30000);
+  }
+
   private static Pair<Text,KeyExtent> verifyRootTablet(ServerContext context,
       TServerInstance instance) throws AccumuloException {
     ZooTabletStateStore store = new ZooTabletStateStore(context);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
 
b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
index 9a2e6be..30291ab 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.HostAndPort;
@@ -47,6 +48,8 @@ public class MultiTserverReplicationIT extends 
ConfigurableMacBase {
 
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    // set the name to kick off the replication services
+    cfg.setProperty(Property.REPLICATION_NAME.getKey(), "test");
     cfg.setNumTservers(2);
   }
 

Reply via email to