htran1 commented on a change in pull request #2863: GOBBLIN-1016: Allow Gobblin
Application Master to join Helix cluster …
URL: https://github.com/apache/incubator-gobblin/pull/2863#discussion_r366518825
##########
File path:
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
##########
@@ -265,17 +297,61 @@ public void testSendShutdownRequest() throws Exception {
Assert.assertEquals(this.curatorFramework.checkExists()
.forPath(String.format("/%s/CONTROLLER/MESSAGES",
GobblinYarnAppLauncherTest.class.getSimpleName()))
.getVersion(), 0);
- YarnSecurityManagerTest.GetControllerMessageNumFunc getCtrlMessageNum =
- new
YarnSecurityManagerTest.GetControllerMessageNumFunc(GobblinYarnAppLauncherTest.class.getSimpleName(),
+ YarnSecurityManagerTest.GetHelixMessageNumFunc getCtrlMessageNum =
+ new
YarnSecurityManagerTest.GetHelixMessageNumFunc(GobblinYarnAppLauncherTest.class.getSimpleName(),
InstanceType.CONTROLLER, "",
this.curatorFramework);
AssertWithBackoff assertWithBackoff =
AssertWithBackoff.create().logger(LoggerFactory.getLogger("testSendShutdownRequest")).timeoutMs(20000);
assertWithBackoff.assertEquals(getCtrlMessageNum, 1, "1 controller message
queued");
// Give Helix sometime to handle the message
assertWithBackoff.assertEquals(getCtrlMessageNum, 0, "all controller
messages processed");
+
+ this.helixManagerManagedHelix.connect();
+
this.helixManagerManagedHelix.getMessagingService().registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+ new TestShutdownMessageHandlerFactory(this));
+
+ this.gobblinYarnAppLauncherManagedHelix.connectHelixManager();
+ this.gobblinYarnAppLauncherManagedHelix.sendShutdownRequest();
+
+ Assert.assertEquals(this.curatorFramework.checkExists()
+ .forPath(String.format("/%s/INSTANCES/%s/MESSAGES",
this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
TEST_HELIX_INSTANCE_NAME_MANAGED))
+ .getVersion(), 0);
+ YarnSecurityManagerTest.GetHelixMessageNumFunc getInstanceMessageNum =
+ new
YarnSecurityManagerTest.GetHelixMessageNumFunc(this.configManagedHelix.getString(
+ GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
+ InstanceType.PARTICIPANT, TEST_HELIX_INSTANCE_NAME_MANAGED,
this.curatorFramework);
+ assertWithBackoff =
+
AssertWithBackoff.create().logger(LoggerFactory.getLogger("testSendShutdownRequest")).timeoutMs(20000);
+ assertWithBackoff.assertEquals(getInstanceMessageNum, 1, "1 controller
message queued");
+
+ // Give Helix sometime to handle the message
+ assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller
messages processed");
}
+ /*static class GetInstanceMessageFunc implements Function<Void, Integer> {
Review comment:
Remove this block?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services