nlu90 closed pull request #2901: Refactor CheckpointManager to move initialization code out of constru… URL: https://github.com/apache/incubator-heron/pull/2901
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java index fa9c435f9c..5c590ffca5 100644 --- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java +++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java @@ -50,8 +50,8 @@ private static final String CHECKPOINT_MANAGER_HOST = "127.0.0.1"; // The looper drives CheckpointManagerServer - private final NIOLooper checkpointManagerServerLoop; - private final CheckpointManagerServer checkpointManagerServer; + private NIOLooper checkpointManagerServerLoop; + private CheckpointManagerServer checkpointManagerServer; // Print usage options private static void usage(Options options) { @@ -133,14 +133,21 @@ private static Options constructHelpOptions() { return options; } - public CheckpointManager( - String topologyName, String topologyId, String checkpointMgrId, - String serverHost, int serverPort, - SystemConfig systemConfig, CheckpointManagerConfig checkpointManagerConfig) - throws IOException, CheckpointManagerException { + public CheckpointManager() { + } - this.checkpointManagerServerLoop = new NIOLooper(); + public void init( + String topologyName, + String topologyId, + String checkpointMgrId, + String serverHost, + int serverPort, + SystemConfig systemConfig, + CheckpointManagerConfig checkpointManagerConfig) + throws IOException, CheckpointManagerException { + LOG.info("Initializing CheckpointManager"); + checkpointManagerServerLoop = new NIOLooper(); HeronSocketOptions serverSocketOptions = new HeronSocketOptions( checkpointManagerConfig.getWriteBatchSize(), @@ -152,9 +159,29 @@ public CheckpointManager( checkpointManagerConfig.getMaximumPacketSize()); // Setup the IStatefulStorage - // TODO(mfu): This should be done in an executor driven by another thread, kind of async + IStatefulStorage statefulStorage = setupStatefulStorage(topologyName, checkpointManagerConfig); + + // Start the server + this.checkpointManagerServer = new CheckpointManagerServer( + topologyName, topologyId, checkpointMgrId, statefulStorage, + checkpointManagerServerLoop, serverHost, serverPort, serverSocketOptions); + } + + public void startAndLoop() { + // The CheckpointManagerServer would run in the main thread + // We do it in the final step since it would await the main thread + LOG.info("Starting CheckpointManager Server"); + checkpointManagerServer.start(); + checkpointManagerServerLoop.loop(); + } + + private static IStatefulStorage setupStatefulStorage( + String topologyName, + CheckpointManagerConfig checkpointManagerConfig) throws CheckpointManagerException { + IStatefulStorage statefulStorage; String classname = checkpointManagerConfig.getStorageClassname(); + try { statefulStorage = (IStatefulStorage) Class.forName(classname).newInstance(); } catch (InstantiationException e) { @@ -172,18 +199,7 @@ public CheckpointManager( throw new CheckpointManagerException(classname + " init threw exception", e); } - // Start the server - this.checkpointManagerServer = new CheckpointManagerServer( - topologyName, topologyId, checkpointMgrId, statefulStorage, - checkpointManagerServerLoop, serverHost, serverPort, serverSocketOptions); - } - - public void startAndLoop() { - // The CheckpointManagerServer would run in the main thread - // We do it in the final step since it would await the main thread - LOG.info("Starting CheckpointManager Server"); - checkpointManagerServer.start(); - checkpointManagerServerLoop.loop(); + return statefulStorage; } public static void main(String[] args) throws IOException, @@ -241,9 +257,9 @@ public static void main(String[] args) throws IOException, LOG.info("System Config: " + systemConfig); - CheckpointManager checkpointManager = - new CheckpointManager(topologyName, topologyId, ckptmgrId, - CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig); + CheckpointManager checkpointManager = new CheckpointManager(); + checkpointManager.init(topologyName, topologyId, ckptmgrId, + CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig); checkpointManager.startAndLoop(); LOG.info("Loops terminated. Exiting."); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services