sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle 
orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399589040
 
 

 ##########
 File path: 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          
.registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, 
helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        
.registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      
this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(),
 helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will 
perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open 
clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance 
structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the 
cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new 
ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
 
 Review comment:
   The intent was to avoid a state where the HelixManager is in an inconsistent 
state where the underlying ZkClient is connected but there is a failure later. 
By disconnecting before retrying, we ensure any partial state during the 
previous connect attempt is cleaned up. 
   
   The reason we are instantiating a new HelixAdmin instance (separate from 
HelixManager) is that getClusterManagementTool() requires HelixManager to be 
connected first. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to