This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef20ad Refactor CheckpointManager to move initialization code out of
constructor (#2901)
6ef20ad is described below
commit 6ef20ad34899b94d36d73231f4c0a3a73098c7c8
Author: Ning Wang <[email protected]>
AuthorDate: Thu May 17 13:53:39 2018 -0700
Refactor CheckpointManager to move initialization code out of constructor
(#2901)
---
.../apache/heron/ckptmgr/CheckpointManager.java | 64 ++++++++++++++--------
1 file changed, 40 insertions(+), 24 deletions(-)
diff --git
a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index fa9c435..5c590ff 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -50,8 +50,8 @@ public class CheckpointManager {
private static final String CHECKPOINT_MANAGER_HOST = "127.0.0.1";
// The looper drives CheckpointManagerServer
- private final NIOLooper checkpointManagerServerLoop;
- private final CheckpointManagerServer checkpointManagerServer;
+ private NIOLooper checkpointManagerServerLoop;
+ private CheckpointManagerServer checkpointManagerServer;
// Print usage options
private static void usage(Options options) {
@@ -133,14 +133,21 @@ public class CheckpointManager {
return options;
}
- public CheckpointManager(
- String topologyName, String topologyId, String checkpointMgrId,
- String serverHost, int serverPort,
- SystemConfig systemConfig, CheckpointManagerConfig
checkpointManagerConfig)
- throws IOException, CheckpointManagerException {
+ public CheckpointManager() {
+ }
- this.checkpointManagerServerLoop = new NIOLooper();
+ public void init(
+ String topologyName,
+ String topologyId,
+ String checkpointMgrId,
+ String serverHost,
+ int serverPort,
+ SystemConfig systemConfig,
+ CheckpointManagerConfig checkpointManagerConfig)
+ throws IOException, CheckpointManagerException {
+ LOG.info("Initializing CheckpointManager");
+ checkpointManagerServerLoop = new NIOLooper();
HeronSocketOptions serverSocketOptions =
new HeronSocketOptions(
checkpointManagerConfig.getWriteBatchSize(),
@@ -152,9 +159,29 @@ public class CheckpointManager {
checkpointManagerConfig.getMaximumPacketSize());
// Setup the IStatefulStorage
- // TODO(mfu): This should be done in an executor driven by another thread,
kind of async
+ IStatefulStorage statefulStorage = setupStatefulStorage(topologyName,
checkpointManagerConfig);
+
+ // Start the server
+ this.checkpointManagerServer = new CheckpointManagerServer(
+ topologyName, topologyId, checkpointMgrId, statefulStorage,
+ checkpointManagerServerLoop, serverHost, serverPort,
serverSocketOptions);
+ }
+
+ public void startAndLoop() {
+ // The CheckpointManagerServer would run in the main thread
+ // We do it in the final step since it would await the main thread
+ LOG.info("Starting CheckpointManager Server");
+ checkpointManagerServer.start();
+ checkpointManagerServerLoop.loop();
+ }
+
+ private static IStatefulStorage setupStatefulStorage(
+ String topologyName,
+ CheckpointManagerConfig checkpointManagerConfig) throws
CheckpointManagerException {
+
IStatefulStorage statefulStorage;
String classname = checkpointManagerConfig.getStorageClassname();
+
try {
statefulStorage = (IStatefulStorage)
Class.forName(classname).newInstance();
} catch (InstantiationException e) {
@@ -172,18 +199,7 @@ public class CheckpointManager {
throw new CheckpointManagerException(classname + " init threw
exception", e);
}
- // Start the server
- this.checkpointManagerServer = new CheckpointManagerServer(
- topologyName, topologyId, checkpointMgrId, statefulStorage,
- checkpointManagerServerLoop, serverHost, serverPort,
serverSocketOptions);
- }
-
- public void startAndLoop() {
- // The CheckpointManagerServer would run in the main thread
- // We do it in the final step since it would await the main thread
- LOG.info("Starting CheckpointManager Server");
- checkpointManagerServer.start();
- checkpointManagerServerLoop.loop();
+ return statefulStorage;
}
public static void main(String[] args) throws IOException,
@@ -241,9 +257,9 @@ public class CheckpointManager {
LOG.info("System Config: " + systemConfig);
- CheckpointManager checkpointManager =
- new CheckpointManager(topologyName, topologyId, ckptmgrId,
- CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig);
+ CheckpointManager checkpointManager = new CheckpointManager();
+ checkpointManager.init(topologyName, topologyId, ckptmgrId,
+ CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig);
checkpointManager.startAndLoop();
LOG.info("Loops terminated. Exiting.");
--
To stop receiving notification emails like this one, please contact
[email protected].