shirshanka commented on a change in pull request #2609: GOBBLIN-744: Support
cancellation of a Helix workflow via a DELETE Spec.
URL: https://github.com/apache/incubator-gobblin/pull/2609#discussion_r277182228
##########
File path:
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
##########
@@ -82,6 +97,72 @@ public void testJobShouldComplete()
suite.waitForAndVerifyOutputFiles();
}
+ /**
+ * An integration test for cancelling a Helix workflow via a JobSpec. This
test case starts a Helix cluster with
+ * a {@link FsScheduledJobConfigurationManager}. The test case does the
following:
+ * <ul>
+ * <li> add a {@link org.apache.gobblin.runtime.api.JobSpec} that uses a
{@link org.apache.gobblin.cluster.SleepingCustomTaskSource})
+ * to {@link IntegrationJobCancelViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
which is picked by the JobConfigurationManager. </li>
+ * <li> the JobConfigurationManager sends a notification to the
GobblinHelixJobScheduler which schedules the job for execution. The JobSpec is
+ * also added to the JobCatalog for persistence. Helix starts a Workflow
for this JobSpec. </li>
+ * <li> We then add a {@link org.apache.gobblin.runtime.api.JobSpec} with
DELETE Verb to {@link IntegrationJobCancelViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
+ * This signals GobblinHelixJobScheduler (and, Helix) to delete the
running job (i.e., Helix Workflow) started in the previous step. </li>
+ * <li> Finally, we inspect the state of the zNode corresponding to the
Workflow resource in Zookeeper to ensure that its {@link
org.apache.helix.task.TargetState}
+ * is STOP. </li>
+ * </ul>
+ */
+ @Test (dependsOnMethods = { "testJobShouldGetCancelled" })
+ public void testJobCancellationViaSpec() throws Exception {
+ this.suite = new IntegrationJobCancelViaSpecSuite();
+ HelixManager helixManager = getHelixManager();
+
+ IntegrationJobCancelViaSpecSuite cancelViaSpecSuite =
(IntegrationJobCancelViaSpecSuite) this.suite;
+
+ //Add a new JobSpec to the path monitored by the SpecConsumer
+ cancelViaSpecSuite.addJobSpec(IntegrationJobCancelViaSpecSuite.JOB_ID,
SpecExecutor.Verb.ADD.name());
+
+ //Start the cluster
+ cancelViaSpecSuite.startCluster();
+
+ helixManager.connect();
+
+ while (TaskDriver.getWorkflowContext(helixManager,
IntegrationJobCancelViaSpecSuite.JOB_ID) == null) {
+ log.warn("Waiting for the job to start...");
+ Thread.sleep(1000L);
+ }
+
+ // Give the job some time to reach writer, where it sleeps
+ Thread.sleep(2000L);
+
+ ZkClient zkClient = new ZkClient(this.zkConnectString);
+ PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new
ZNRecordStreamingSerializer()).build();
+ zkClient.setZkSerializer(zkSerializer);
+
+ String clusterName = getHelixManager().getClusterName();
+ String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE",
IntegrationJobCancelViaSpecSuite.JOB_ID).toString();
+
+ //Ensure that the Workflow is started
+ ZNRecord record = zkClient.readData(zNodePath);
+ String targetState = record.getSimpleField("TargetState");
+ Assert.assertEquals(targetState, TargetState.START.name());
+
+ //Add a JobSpec with DELETE verb signalling the Helix cluster to cancel
the workflow
+ cancelViaSpecSuite.addJobSpec(IntegrationJobCancelViaSpecSuite.JOB_ID,
SpecExecutor.Verb.DELETE.name());
+
+ //Give some time for the FsScheduledJobConfigurationManager to pick up the
DELETE spec and send
+ // DeleteJobConfigArrivalEvent.
+ Thread.sleep(3000L);
Review comment:
Any way to avoid sleeping but waiting on some other observable event to
figure out if delete has been processed?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services