This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef20ad  Refactor CheckpointManager to move initialization code out of 
constructor (#2901)
6ef20ad is described below

commit 6ef20ad34899b94d36d73231f4c0a3a73098c7c8
Author: Ning Wang <[email protected]>
AuthorDate: Thu May 17 13:53:39 2018 -0700

    Refactor CheckpointManager to move initialization code out of constructor 
(#2901)
---
 .../apache/heron/ckptmgr/CheckpointManager.java    | 64 ++++++++++++++--------
 1 file changed, 40 insertions(+), 24 deletions(-)

diff --git 
a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java 
b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index fa9c435..5c590ff 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -50,8 +50,8 @@ public class CheckpointManager {
   private static final String CHECKPOINT_MANAGER_HOST = "127.0.0.1";
 
   // The looper drives CheckpointManagerServer
-  private final NIOLooper checkpointManagerServerLoop;
-  private final CheckpointManagerServer checkpointManagerServer;
+  private NIOLooper checkpointManagerServerLoop;
+  private CheckpointManagerServer checkpointManagerServer;
 
   // Print usage options
   private static void usage(Options options) {
@@ -133,14 +133,21 @@ public class CheckpointManager {
     return options;
   }
 
-  public CheckpointManager(
-      String topologyName, String topologyId, String checkpointMgrId,
-      String serverHost, int serverPort,
-      SystemConfig systemConfig, CheckpointManagerConfig 
checkpointManagerConfig)
-      throws IOException, CheckpointManagerException {
+  public CheckpointManager() {
+  }
 
-    this.checkpointManagerServerLoop = new NIOLooper();
+  public void init(
+      String topologyName,
+      String topologyId,
+      String checkpointMgrId,
+      String serverHost,
+      int serverPort,
+      SystemConfig systemConfig,
+      CheckpointManagerConfig checkpointManagerConfig)
+      throws IOException, CheckpointManagerException {
 
+    LOG.info("Initializing CheckpointManager");
+    checkpointManagerServerLoop = new NIOLooper();
     HeronSocketOptions serverSocketOptions =
         new HeronSocketOptions(
             checkpointManagerConfig.getWriteBatchSize(),
@@ -152,9 +159,29 @@ public class CheckpointManager {
             checkpointManagerConfig.getMaximumPacketSize());
 
     // Setup the IStatefulStorage
-    // TODO(mfu): This should be done in an executor driven by another thread, 
kind of async
+    IStatefulStorage statefulStorage = setupStatefulStorage(topologyName, 
checkpointManagerConfig);
+
+    // Start the server
+    this.checkpointManagerServer = new CheckpointManagerServer(
+        topologyName, topologyId, checkpointMgrId, statefulStorage,
+        checkpointManagerServerLoop, serverHost, serverPort, 
serverSocketOptions);
+  }
+
+  public void startAndLoop() {
+    // The CheckpointManagerServer would run in the main thread
+    // We do it in the final step since it would await the main thread
+    LOG.info("Starting CheckpointManager Server");
+    checkpointManagerServer.start();
+    checkpointManagerServerLoop.loop();
+  }
+
+  private static IStatefulStorage setupStatefulStorage(
+      String topologyName,
+      CheckpointManagerConfig checkpointManagerConfig) throws 
CheckpointManagerException {
+
     IStatefulStorage statefulStorage;
     String classname = checkpointManagerConfig.getStorageClassname();
+
     try {
       statefulStorage = (IStatefulStorage) 
Class.forName(classname).newInstance();
     } catch (InstantiationException e) {
@@ -172,18 +199,7 @@ public class CheckpointManager {
       throw new CheckpointManagerException(classname + " init threw 
exception", e);
     }
 
-    // Start the server
-    this.checkpointManagerServer = new CheckpointManagerServer(
-        topologyName, topologyId, checkpointMgrId, statefulStorage,
-        checkpointManagerServerLoop, serverHost, serverPort, 
serverSocketOptions);
-  }
-
-  public void startAndLoop() {
-    // The CheckpointManagerServer would run in the main thread
-    // We do it in the final step since it would await the main thread
-    LOG.info("Starting CheckpointManager Server");
-    checkpointManagerServer.start();
-    checkpointManagerServerLoop.loop();
+    return statefulStorage;
   }
 
   public static void main(String[] args) throws IOException,
@@ -241,9 +257,9 @@ public class CheckpointManager {
 
     LOG.info("System Config: " + systemConfig);
 
-    CheckpointManager checkpointManager =
-        new CheckpointManager(topologyName, topologyId, ckptmgrId,
-            CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig);
+    CheckpointManager checkpointManager = new CheckpointManager();
+    checkpointManager.init(topologyName, topologyId, ckptmgrId,
+        CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig);
     checkpointManager.startAndLoop();
 
     LOG.info("Loops terminated. Exiting.");

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to