Author: larsh
Date: Fri Aug  2 01:40:53 2013
New Revision: 1509512

URL: http://svn.apache.org/r1509512
Log:
HBASE-9029 Backport HBASE-8706 Some improvement in snapshot to 0.94 (Jerry He, 
original patch by Matteo)

Modified:
    
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
    
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
    
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
    
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
    
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
    
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
    
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
    
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
    
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java

Modified: 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
 (original)
+++ 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
 Fri Aug  2 01:40:53 2013
@@ -113,7 +113,7 @@ public class SnapshotManager implements 
   private static final String SNAPSHOT_WAKE_MILLIS_KEY = 
"hbase.snapshot.master.wakeMillis";
 
   /** By default, check to see if the snapshot is complete (ms) */
-  private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
+  private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
 
   /**
    * Conf key for # of ms elapsed before injecting a snapshot timeout error 
when waiting for
@@ -131,7 +131,6 @@ public class SnapshotManager implements 
   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
 
   private boolean stopped;
-  private final long wakeFrequency;
   private final MasterServices master;  // Needed by TableEventHandlers
   private final MasterMetrics metricsMaster;
   private final ProcedureCoordinator coordinator;
@@ -168,16 +167,16 @@ public class SnapshotManager implements 
 
     // get the configuration for the coordinator
     Configuration conf = master.getConfiguration();
-    this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, 
SNAPSHOT_WAKE_MILLIS_DEFAULT);
-    long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, 
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, 
SNAPSHOT_WAKE_MILLIS_DEFAULT);
+    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, 
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, 
SNAPSHOT_POOL_THREADS_DEFAULT);
 
     // setup the default procedure coordinator
     String name = master.getServerName().toString();
-    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 
keepAliveTime, opThreads, wakeFrequency);
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 
opThreads);
     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
         master.getZooKeeper(), 
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
-    this.coordinator = new ProcedureCoordinator(comms, tpool);
+    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, 
wakeFrequency);
     this.executorService = master.getExecutorService();
     resetTempDir();
   }
@@ -197,8 +196,6 @@ public class SnapshotManager implements 
     this.rootDir = master.getMasterFileSystem().getRootDir();
     checkSnapshotSupport(master.getConfiguration(), 
master.getMasterFileSystem());
 
-    this.wakeFrequency = 
master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
-      SNAPSHOT_WAKE_MILLIS_DEFAULT);
     this.coordinator = coordinator;
     this.executorService = pool;
     resetTempDir();
@@ -870,6 +867,12 @@ public class SnapshotManager implements 
     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
       restoreHandler.cancel(why);
     }
+
+    try {
+      coordinator.close();
+    } catch (IOException e) {
+      LOG.error("stop ProcedureCoordinator error", e);
+    }
   }
 
   @Override

Modified: 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
 (original)
+++ 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
 Fri Aug  2 01:40:53 2013
@@ -97,7 +97,7 @@ public abstract class TakeSnapshotHandle
     this.rootDir = this.master.getMasterFileSystem().getRootDir();
     this.snapshotDir = 
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
     this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, 
rootDir);
-    this.monitor =  new ForeignExceptionDispatcher();
+    this.monitor = new ForeignExceptionDispatcher(snapshot.getName());
 
     // prepare the verify
     this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, 
rootDir);
@@ -147,6 +147,7 @@ public abstract class TakeSnapshotHandle
 
       // run the snapshot
       snapshotRegions(regionsAndLocations);
+      monitor.rethrowException();
 
       // extract each pair to separate lists
       Set<String> serverNames = new HashSet<String>();

Modified: 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
 (original)
+++ 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
 Fri Aug  2 01:40:53 2013
@@ -51,11 +51,14 @@ import com.google.common.collect.MapMake
 public class ProcedureCoordinator {
   private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
 
+  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
   final static long TIMEOUT_MILLIS_DEFAULT = 60000;
   final static long WAKE_MILLIS_DEFAULT = 500;
 
   private final ProcedureCoordinatorRpcs rpcs;
   private final ExecutorService pool;
+  private final long wakeTimeMillis;
+  private final long timeoutMillis;
 
   // Running procedure table.  Maps procedure name to running procedure 
reference
   private final ConcurrentMap<String, Procedure> procedures =
@@ -71,6 +74,23 @@ public class ProcedureCoordinator {
    * @param pool Used for executing procedures.
    */
   public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, 
ThreadPoolExecutor pool) {
+    this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
+  }
+
+  /**
+   * Create and start a ProcedureCoordinator.
+   *
+   * The rpc object registers the ProcedureCoordinator and starts any threads 
in
+   * this constructor.
+   *
+   * @param rpcs
+   * @param pool Used for executing procedures.
+   * @param timeoutMillis
+   */
+  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, 
ThreadPoolExecutor pool,
+      long timeoutMillis, long wakeTimeMillis) {
+    this.timeoutMillis = timeoutMillis;
+    this.wakeTimeMillis = wakeTimeMillis;
     this.rpcs = rpcs;
     this.pool = pool;
     this.rpcs.start(this);
@@ -78,10 +98,24 @@ public class ProcedureCoordinator {
 
   /**
    * Default thread pool for the procedure
+   *
+   * @param coordName
+   * @param opThreads the maximum number of threads to allow in the pool
    */
-  public static ThreadPoolExecutor defaultPool(String coordName, long 
keepAliveTime, int opThreads,
-      long wakeFrequency) {
-    return new ThreadPoolExecutor(1, opThreads, keepAliveTime, 
TimeUnit.SECONDS,
+  public static ThreadPoolExecutor defaultPool(String coordName, int 
opThreads) {
+    return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
+  }
+
+  /**
+   * Default thread pool for the procedure
+   *
+   * @param coordName
+   * @param opThreads the maximum number of threads to allow in the pool
+   * @param keepAliveMillis the maximum time (ms) that excess idle threads 
will wait for new tasks
+   */
+  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
+      long keepAliveMillis) {
+    return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, 
TimeUnit.MILLISECONDS,
         new SynchronousQueue<Runnable>(),
         new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
   }
@@ -194,7 +228,7 @@ public class ProcedureCoordinator {
   Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, 
byte[] procArgs,
       List<String> expectedMembers) {
     // build the procedure
-    return new Procedure(this, fed, WAKE_MILLIS_DEFAULT, 
TIMEOUT_MILLIS_DEFAULT,
+    return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
         procName, procArgs, expectedMembers);
   }
 

Modified: 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
 (original)
+++ 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
 Fri Aug  2 01:40:53 2013
@@ -51,6 +51,8 @@ import com.google.common.collect.MapMake
 public class ProcedureMember implements Closeable {
   private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
 
+  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
+
   private final SubprocedureFactory builder;
   private final ProcedureMemberRpcs rpcs;
 
@@ -72,9 +74,26 @@ public class ProcedureMember implements 
     this.builder = factory;
   }
 
-  public static ThreadPoolExecutor defaultPool(long wakeFrequency, long 
keepAlive,
-      int procThreads, String memberName) {
-    return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS,
+  /**
+   * Default thread pool for the procedure
+   *
+   * @param memberName
+   * @param procThreads the maximum number of threads to allow in the pool
+   */
+  public static ThreadPoolExecutor defaultPool(String memberName, int 
procThreads) {
+    return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
+  }
+
+  /**
+   * Default thread pool for the procedure
+   *
+   * @param memberName
+   * @param procThreads the maximum number of threads to allow in the pool
+   * @param keepAliveMillis the maximum time (ms) that excess idle threads 
will wait for new tasks
+   */
+  public static ThreadPoolExecutor defaultPool(String memberName, int 
procThreads,
+      long keepAliveMillis) {
+    return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, 
TimeUnit.MILLISECONDS,
         new SynchronousQueue<Runnable>(),
         new DaemonThreadFactory("member: '" + memberName + "' 
subprocedure-pool"));
   }
@@ -85,7 +104,7 @@ public class ProcedureMember implements 
    * @return reference to the Procedure member's rpcs object
    */
   ProcedureMemberRpcs getRpcs() {
-     return rpcs;
+    return rpcs;
   }
 
 

Modified: 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
 (original)
+++ 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
 Fri Aug  2 01:40:53 2013
@@ -198,7 +198,6 @@ abstract public class Subprocedure imple
       } else {
         msg = "Subprocedure '" + barrierName + "' failed!";
       }
-      LOG.error(msg , e);
       cancel(msg, e);
 
       LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");

Modified: 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
 (original)
+++ 
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
 Fri Aug  2 01:40:53 2013
@@ -124,13 +124,12 @@ public class RegionServerSnapshotManager
 
     // read in the snapshot request configuration properties
     Configuration conf = rss.getConfiguration();
-    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, 
SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
     long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, 
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
     int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, 
SNAPSHOT_REQUEST_THREADS_DEFAULT);
 
     // create the actual snapshot procedure member
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, 
keepAlive, opThreads, 
-      rss.getServerName().toString());
+    ThreadPoolExecutor pool = 
ProcedureMember.defaultPool(rss.getServerName().toString(),
+      opThreads, keepAlive);
     this.member = new ProcedureMember(memberRpcs, pool, new 
SnapshotSubprocedureBuilder());
   }
 
@@ -192,7 +191,7 @@ public class RegionServerSnapshotManager
 
     LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " 
from table "
         + snapshot.getTable());
-    ForeignExceptionDispatcher exnDispatcher = new 
ForeignExceptionDispatcher();
+    ForeignExceptionDispatcher exnDispatcher = new 
ForeignExceptionDispatcher(snapshot.getName());
     Configuration conf = rss.getConfiguration();
     long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
         SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
@@ -356,7 +355,6 @@ public class RegionServerSnapshotManager
       }
 
       // evict remaining tasks and futures from taskPool.
-      LOG.debug(taskPool);
       while (!futures.isEmpty()) {
         // block to remove cancelled futures;
         LOG.warn("Removing cancelled elements from taskPool");

Modified: 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
 (original)
+++ 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
 Fri Aug  2 01:40:53 2013
@@ -86,7 +86,7 @@ public class TestProcedureCoordinator {
   }
 
   private ProcedureCoordinator buildNewCoordinator() {
-    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 
POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, 
POOL_KEEP_ALIVE);
     return spy(new ProcedureCoordinator(controller, pool));
   }
 

Modified: 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
 (original)
+++ 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
 Fri Aug  2 01:40:53 2013
@@ -88,7 +88,7 @@ public class TestProcedureMember {
    */
   private ProcedureMember buildCohortMember() {
     String name = "node";
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, 
POOL_KEEP_ALIVE, 1, name);
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, 
POOL_KEEP_ALIVE);
     return new ProcedureMember(mockMemberComms, pool, mockBuilder);
   }
 
@@ -98,7 +98,7 @@ public class TestProcedureMember {
   private void buildCohortMemberPair() throws IOException {
     dispatcher = new ForeignExceptionDispatcher();
     String name = "node";
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, 
POOL_KEEP_ALIVE, 1, name);
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, 
POOL_KEEP_ALIVE);
     member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
     when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed 
for generating exception
     Subprocedure subproc = new EmptySubprocedure(member, dispatcher);

Modified: 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
--- 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
 (original)
+++ 
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
 Fri Aug  2 01:40:53 2013
@@ -127,7 +127,7 @@ public class TestZKProcedure {
     // start running the controller
     ZKProcedureCoordinatorRpcs coordinatorComms = new 
ZKProcedureCoordinatorRpcs(
         coordZkw, opDescription, COORDINATOR_NODE_NAME);
-    ThreadPoolExecutor pool = 
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, 
WAKE_FREQUENCY);
+    ThreadPoolExecutor pool = 
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
     ProcedureCoordinator coordinator = new 
ProcedureCoordinator(coordinatorComms, pool) {
       @Override
       public Procedure createProcedure(ForeignExceptionDispatcher fed, String 
procName, byte[] procArgs,
@@ -145,7 +145,7 @@ public class TestZKProcedure {
     for (String member : members) {
       ZooKeeperWatcher watcher = newZooKeeperWatcher();
       ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, 
opDescription);
-      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, 
KEEP_ALIVE, 1, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, 
KEEP_ALIVE);
       ProcedureMember procMember = new ProcedureMember(comms, pool2, 
subprocFactory);
       procMembers.add(new Pair<ProcedureMember, 
ZKProcedureMemberRpcs>(procMember, comms));
       comms.start(member, procMember);
@@ -209,7 +209,7 @@ public class TestZKProcedure {
     ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
     ZKProcedureCoordinatorRpcs coordinatorController = new 
ZKProcedureCoordinatorRpcs(
         coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
-    ThreadPoolExecutor pool = 
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, 
WAKE_FREQUENCY);
+    ThreadPoolExecutor pool = 
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
     ProcedureCoordinator coordinator = spy(new 
ProcedureCoordinator(coordinatorController, pool));
 
     // start a member for each node
@@ -219,7 +219,7 @@ public class TestZKProcedure {
     for (String member : expected) {
       ZooKeeperWatcher watcher = newZooKeeperWatcher();
       ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, 
opDescription);
-      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, 
KEEP_ALIVE, 1, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, 
KEEP_ALIVE);
       ProcedureMember mem = new ProcedureMember(controller, pool2, 
subprocFactory);
       members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, 
controller));
       controller.start(member, mem);


Reply via email to