sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus… URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399590389
########## File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ########## @@ -366,20 +377,74 @@ boolean isStopped() { } @VisibleForTesting - void connectHelixManager() { - try { - this.jobHelixManager.connect(); - this.jobHelixManager.getMessagingService() - .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, - new ParticipantShutdownMessageHandlerFactory()); - this.jobHelixManager.getMessagingService() - .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), - getUserDefinedMessageHandlerFactory()); - if (this.taskDriverHelixManager.isPresent()) { - this.taskDriverHelixManager.get().connect(); + void connectHelixManager() throws Exception { + this.jobHelixManager.connect(); + //Ensure the instance is enabled. + this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true); + this.jobHelixManager.getMessagingService() + .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, + new ParticipantShutdownMessageHandlerFactory()); + this.jobHelixManager.getMessagingService() + .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), + getUserDefinedMessageHandlerFactory()); + if (this.taskDriverHelixManager.isPresent()) { + this.taskDriverHelixManager.get().connect(); + //Ensure the instance is enabled. + this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true); + } + } + + /** + * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting + * to re-join the cluster: + * <li> + * <ul>Disconnect from Helix cluster, which would close any open clients</ul> + * <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul> + * <ul>Re-construct helix manager instances, used to re-join the cluster</ul> + * </li> + */ + private void onClusterJoinFailure() { + logger.warn("Disconnecting Helix manager.."); + disconnectHelixManager(); + + HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY)); + //Drop the helix Instance + logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName); + HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName); + + if (this.taskDriverHelixManager.isPresent()) { + String taskDriverCluster = clusterConfig.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY); + logger.warn("Dropping instance: {} from task driver cluster: {}", helixInstanceName, taskDriverCluster); + HelixUtils.dropInstanceIfExists(admin, taskDriverCluster, helixInstanceName); + } + admin.close(); + + logger.warn("Reinitializing Helix manager.."); + initHelixManager(); + } + + @VisibleForTesting + void connectHelixManagerWithRetry() { + Callable<Void> connectHelixManagerCallable = () -> { + try { + logger.info("Instance: {} attempting to join cluster: {}", helixInstanceName, clusterName); + connectHelixManager(); + } catch (HelixException e) { + logger.error("Exception encountered when joining cluster", e); + onClusterJoinFailure(); + throw e; } - } catch (Exception e) { - logger.error("HelixManager failed to connect", e); + return null; + }; + + Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder() + .retryIfException() + .withStopStrategy(StopStrategies.stopAfterAttempt(5)).build(); + + try { + retryer.call(connectHelixManagerCallable); + } catch (ExecutionException | RetryException e) { + logger.error("Connecting to Helix manager failed", e); Review comment: Thanks! Will fix it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services