Author: larsh
Date: Fri Aug 2 01:40:53 2013
New Revision: 1509512
URL: http://svn.apache.org/r1509512
Log:
HBASE-9029 Backport HBASE-8706 Some improvement in snapshot to 0.94 (Jerry He,
original patch by Matteo)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
Fri Aug 2 01:40:53 2013
@@ -113,7 +113,7 @@ public class SnapshotManager implements
private static final String SNAPSHOT_WAKE_MILLIS_KEY =
"hbase.snapshot.master.wakeMillis";
/** By default, check to see if the snapshot is complete (ms) */
- private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
+ private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
/**
* Conf key for # of ms elapsed before injecting a snapshot timeout error
when waiting for
@@ -131,7 +131,6 @@ public class SnapshotManager implements
private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
private boolean stopped;
- private final long wakeFrequency;
private final MasterServices master; // Needed by TableEventHandlers
private final MasterMetrics metricsMaster;
private final ProcedureCoordinator coordinator;
@@ -168,16 +167,16 @@ public class SnapshotManager implements
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
- this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY,
SNAPSHOT_WAKE_MILLIS_DEFAULT);
- long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+ long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY,
SNAPSHOT_WAKE_MILLIS_DEFAULT);
+ long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY,
SNAPSHOT_POOL_THREADS_DEFAULT);
// setup the default procedure coordinator
String name = master.getServerName().toString();
- ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name,
keepAliveTime, opThreads, wakeFrequency);
+ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name,
opThreads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
master.getZooKeeper(),
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
- this.coordinator = new ProcedureCoordinator(comms, tpool);
+ this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis,
wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
}
@@ -197,8 +196,6 @@ public class SnapshotManager implements
this.rootDir = master.getMasterFileSystem().getRootDir();
checkSnapshotSupport(master.getConfiguration(),
master.getMasterFileSystem());
- this.wakeFrequency =
master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
- SNAPSHOT_WAKE_MILLIS_DEFAULT);
this.coordinator = coordinator;
this.executorService = pool;
resetTempDir();
@@ -870,6 +867,12 @@ public class SnapshotManager implements
for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
restoreHandler.cancel(why);
}
+
+ try {
+ coordinator.close();
+ } catch (IOException e) {
+ LOG.error("stop ProcedureCoordinator error", e);
+ }
}
@Override
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
Fri Aug 2 01:40:53 2013
@@ -97,7 +97,7 @@ public abstract class TakeSnapshotHandle
this.rootDir = this.master.getMasterFileSystem().getRootDir();
this.snapshotDir =
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot,
rootDir);
- this.monitor = new ForeignExceptionDispatcher();
+ this.monitor = new ForeignExceptionDispatcher(snapshot.getName());
// prepare the verify
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot,
rootDir);
@@ -147,6 +147,7 @@ public abstract class TakeSnapshotHandle
// run the snapshot
snapshotRegions(regionsAndLocations);
+ monitor.rethrowException();
// extract each pair to separate lists
Set<String> serverNames = new HashSet<String>();
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
Fri Aug 2 01:40:53 2013
@@ -51,11 +51,14 @@ import com.google.common.collect.MapMake
public class ProcedureCoordinator {
private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
+ final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
final static long TIMEOUT_MILLIS_DEFAULT = 60000;
final static long WAKE_MILLIS_DEFAULT = 500;
private final ProcedureCoordinatorRpcs rpcs;
private final ExecutorService pool;
+ private final long wakeTimeMillis;
+ private final long timeoutMillis;
// Running procedure table. Maps procedure name to running procedure
reference
private final ConcurrentMap<String, Procedure> procedures =
@@ -71,6 +74,23 @@ public class ProcedureCoordinator {
* @param pool Used for executing procedures.
*/
public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs,
ThreadPoolExecutor pool) {
+ this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
+ }
+
+ /**
+ * Create and start a ProcedureCoordinator.
+ *
+ * The rpc object registers the ProcedureCoordinator and starts any threads
in
+ * this constructor.
+ *
+ * @param rpcs
+ * @param pool Used for executing procedures.
+ * @param timeoutMillis
+ */
+ public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs,
ThreadPoolExecutor pool,
+ long timeoutMillis, long wakeTimeMillis) {
+ this.timeoutMillis = timeoutMillis;
+ this.wakeTimeMillis = wakeTimeMillis;
this.rpcs = rpcs;
this.pool = pool;
this.rpcs.start(this);
@@ -78,10 +98,24 @@ public class ProcedureCoordinator {
/**
* Default thread pool for the procedure
+ *
+ * @param coordName
+ * @param opThreads the maximum number of threads to allow in the pool
*/
- public static ThreadPoolExecutor defaultPool(String coordName, long
keepAliveTime, int opThreads,
- long wakeFrequency) {
- return new ThreadPoolExecutor(1, opThreads, keepAliveTime,
TimeUnit.SECONDS,
+ public static ThreadPoolExecutor defaultPool(String coordName, int
opThreads) {
+ return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
+ }
+
+ /**
+ * Default thread pool for the procedure
+ *
+ * @param coordName
+ * @param opThreads the maximum number of threads to allow in the pool
+ * @param keepAliveMillis the maximum time (ms) that excess idle threads
will wait for new tasks
+ */
+ public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
+ long keepAliveMillis) {
+ return new ThreadPoolExecutor(1, opThreads, keepAliveMillis,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
}
@@ -194,7 +228,7 @@ public class ProcedureCoordinator {
Procedure createProcedure(ForeignExceptionDispatcher fed, String procName,
byte[] procArgs,
List<String> expectedMembers) {
// build the procedure
- return new Procedure(this, fed, WAKE_MILLIS_DEFAULT,
TIMEOUT_MILLIS_DEFAULT,
+ return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
procName, procArgs, expectedMembers);
}
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
Fri Aug 2 01:40:53 2013
@@ -51,6 +51,8 @@ import com.google.common.collect.MapMake
public class ProcedureMember implements Closeable {
private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
+ final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
+
private final SubprocedureFactory builder;
private final ProcedureMemberRpcs rpcs;
@@ -72,9 +74,26 @@ public class ProcedureMember implements
this.builder = factory;
}
- public static ThreadPoolExecutor defaultPool(long wakeFrequency, long
keepAlive,
- int procThreads, String memberName) {
- return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS,
+ /**
+ * Default thread pool for the procedure
+ *
+ * @param memberName
+ * @param procThreads the maximum number of threads to allow in the pool
+ */
+ public static ThreadPoolExecutor defaultPool(String memberName, int
procThreads) {
+ return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
+ }
+
+ /**
+ * Default thread pool for the procedure
+ *
+ * @param memberName
+ * @param procThreads the maximum number of threads to allow in the pool
+ * @param keepAliveMillis the maximum time (ms) that excess idle threads
will wait for new tasks
+ */
+ public static ThreadPoolExecutor defaultPool(String memberName, int
procThreads,
+ long keepAliveMillis) {
+ return new ThreadPoolExecutor(1, procThreads, keepAliveMillis,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("member: '" + memberName + "'
subprocedure-pool"));
}
@@ -85,7 +104,7 @@ public class ProcedureMember implements
* @return reference to the Procedure member's rpcs object
*/
ProcedureMemberRpcs getRpcs() {
- return rpcs;
+ return rpcs;
}
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
Fri Aug 2 01:40:53 2013
@@ -198,7 +198,6 @@ abstract public class Subprocedure imple
} else {
msg = "Subprocedure '" + barrierName + "' failed!";
}
- LOG.error(msg , e);
cancel(msg, e);
LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
Fri Aug 2 01:40:53 2013
@@ -124,13 +124,12 @@ public class RegionServerSnapshotManager
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
- long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY,
SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
- ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis,
keepAlive, opThreads,
- rss.getServerName().toString());
+ ThreadPoolExecutor pool =
ProcedureMember.defaultPool(rss.getServerName().toString(),
+ opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new
SnapshotSubprocedureBuilder());
}
@@ -192,7 +191,7 @@ public class RegionServerSnapshotManager
LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + "
from table "
+ snapshot.getTable());
- ForeignExceptionDispatcher exnDispatcher = new
ForeignExceptionDispatcher();
+ ForeignExceptionDispatcher exnDispatcher = new
ForeignExceptionDispatcher(snapshot.getName());
Configuration conf = rss.getConfiguration();
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
@@ -356,7 +355,6 @@ public class RegionServerSnapshotManager
}
// evict remaining tasks and futures from taskPool.
- LOG.debug(taskPool);
while (!futures.isEmpty()) {
// block to remove cancelled futures;
LOG.warn("Removing cancelled elements from taskPool");
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
Fri Aug 2 01:40:53 2013
@@ -86,7 +86,7 @@ public class TestProcedureCoordinator {
}
private ProcedureCoordinator buildNewCoordinator() {
- ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName,
POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
+ ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1,
POOL_KEEP_ALIVE);
return spy(new ProcedureCoordinator(controller, pool));
}
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
Fri Aug 2 01:40:53 2013
@@ -88,7 +88,7 @@ public class TestProcedureMember {
*/
private ProcedureMember buildCohortMember() {
String name = "node";
- ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY,
POOL_KEEP_ALIVE, 1, name);
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1,
POOL_KEEP_ALIVE);
return new ProcedureMember(mockMemberComms, pool, mockBuilder);
}
@@ -98,7 +98,7 @@ public class TestProcedureMember {
private void buildCohortMemberPair() throws IOException {
dispatcher = new ForeignExceptionDispatcher();
String name = "node";
- ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY,
POOL_KEEP_ALIVE, 1, name);
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1,
POOL_KEEP_ALIVE);
member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed
for generating exception
Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1509512&r1=1509511&r2=1509512&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
Fri Aug 2 01:40:53 2013
@@ -127,7 +127,7 @@ public class TestZKProcedure {
// start running the controller
ZKProcedureCoordinatorRpcs coordinatorComms = new
ZKProcedureCoordinatorRpcs(
coordZkw, opDescription, COORDINATOR_NODE_NAME);
- ThreadPoolExecutor pool =
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE,
WAKE_FREQUENCY);
+ ThreadPoolExecutor pool =
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
ProcedureCoordinator coordinator = new
ProcedureCoordinator(coordinatorComms, pool) {
@Override
public Procedure createProcedure(ForeignExceptionDispatcher fed, String
procName, byte[] procArgs,
@@ -145,7 +145,7 @@ public class TestZKProcedure {
for (String member : members) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher,
opDescription);
- ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY,
KEEP_ALIVE, 1, member);
+ ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1,
KEEP_ALIVE);
ProcedureMember procMember = new ProcedureMember(comms, pool2,
subprocFactory);
procMembers.add(new Pair<ProcedureMember,
ZKProcedureMemberRpcs>(procMember, comms));
comms.start(member, procMember);
@@ -209,7 +209,7 @@ public class TestZKProcedure {
ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
ZKProcedureCoordinatorRpcs coordinatorController = new
ZKProcedureCoordinatorRpcs(
coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
- ThreadPoolExecutor pool =
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE,
WAKE_FREQUENCY);
+ ThreadPoolExecutor pool =
ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
ProcedureCoordinator coordinator = spy(new
ProcedureCoordinator(coordinatorController, pool));
// start a member for each node
@@ -219,7 +219,7 @@ public class TestZKProcedure {
for (String member : expected) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher,
opDescription);
- ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY,
KEEP_ALIVE, 1, member);
+ ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1,
KEEP_ALIVE);
ProcedureMember mem = new ProcedureMember(controller, pool2,
subprocFactory);
members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem,
controller));
controller.start(member, mem);