[
https://issues.apache.org/jira/browse/GOBBLIN-744?focusedWorklogId=231462&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-231462
]
ASF GitHub Bot logged work on GOBBLIN-744:
------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/19 15:33
Start Date: 23/Apr/19 15:33
Worklog Time Spent: 10m
Work Description: shirshanka commented on pull request #2609:
GOBBLIN-744: Support cancellation of a Helix workflow via a DELETE Spec.
URL: https://github.com/apache/incubator-gobblin/pull/2609#discussion_r277740598
##########
File path:
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
##########
@@ -82,6 +101,105 @@ public void testJobShouldComplete()
suite.waitForAndVerifyOutputFiles();
}
+ /**
+ * An integration test for restarting a Helix workflow via a JobSpec. This
test case starts a Helix cluster with
+ * a {@link FsScheduledJobConfigurationManager}. The test case does the
following:
+ * <ul>
+ * <li> add a {@link org.apache.gobblin.runtime.api.JobSpec} that uses a
{@link org.apache.gobblin.cluster.SleepingCustomTaskSource})
+ * to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
which is picked by the JobConfigurationManager. </li>
+ * <li> the JobConfigurationManager sends a notification to the
GobblinHelixJobScheduler which schedules the job for execution. The JobSpec is
+ * also added to the JobCatalog for persistence. Helix starts a Workflow
for this JobSpec. </li>
+ * <li> We then add a {@link org.apache.gobblin.runtime.api.JobSpec} with
UPDATE Verb to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
+ * This signals GobblinHelixJobScheduler (and, Helix) to first cancel the
running job (i.e., Helix Workflow) started in the previous step.
+ * <li> We inspect the state of the zNode corresponding to the Workflow
resource in Zookeeper to ensure that its {@link
org.apache.helix.task.TargetState}
+ * is STOP. </li>
+ * <li> Once the cancelled job from the previous steps is completed, the
job will be re-launched for execution by the GobblinHelixJobScheduler.
+ * We confirm the execution by again inspecting the zNode and ensuring its
TargetState is START. </li>
+ * </ul>
+ */
+ @Test (dependsOnMethods = { "testJobShouldGetCancelled" })
+ public void testJobRestartViaSpec() throws Exception {
+ this.suite = new IntegrationJobRestartViaSpecSuite();
+ HelixManager helixManager = getHelixManager();
+
+ IntegrationJobRestartViaSpecSuite restartViaSpecSuite =
(IntegrationJobRestartViaSpecSuite) this.suite;
+
+ //Add a new JobSpec to the path monitored by the SpecConsumer
+ restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID,
SpecExecutor.Verb.ADD.name());
+
+ //Start the cluster
+ restartViaSpecSuite.startCluster();
+
+ helixManager.connect();
+
+ AssertWithBackoff asserter1 =
AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1);
+ asserter1.assertTrue(isTaskStarted(helixManager,
IntegrationJobRestartViaSpecSuite.JOB_ID),
+ "Waiting for the job to start...");
+
+ AssertWithBackoff asserter2 =
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1);
+
asserter2.assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE),"Waiting
for the task to enter running state");
+
+ ZkClient zkClient = new ZkClient(this.zkConnectString);
+ PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new
ZNRecordStreamingSerializer()).build();
+ zkClient.setZkSerializer(zkSerializer);
+
+ String clusterName = getHelixManager().getClusterName();
+ String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE",
IntegrationJobRestartViaSpecSuite.JOB_ID).toString();
+
+ //Ensure that the Workflow is started
+ ZNRecord record = zkClient.readData(zNodePath);
+ String targetState = record.getSimpleField("TargetState");
+ Assert.assertEquals(targetState, TargetState.START.name());
+
+ //Add a JobSpec with UPDATE verb signalling the Helix cluster to restart
the workflow
+ restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID,
SpecExecutor.Verb.UPDATE.name());
+
+ AssertWithBackoff asserter3 =
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1);
+ asserter3.assertTrue(input -> {
+ //Inspect the zNode at the path corresponding to the Workflow resource.
Ensure the target state of the resource is in
+ // the STOP state or that the zNode has been deleted.
+ ZNRecord recordNew = zkClient.readData(zNodePath, true);
+ String targetStateNew = null;
+ if (recordNew != null) {
+ targetStateNew = recordNew.getSimpleField("TargetState");
+ }
+ return recordNew == null ||
targetStateNew.equals(TargetState.STOP.name());
+ }, "Waiting for Workflow TargetState to be STOP");
+
+ //Ensure that the SleepingTask did not terminate normally i.e. it was
interrupted. We check this by ensuring
+ // that the line "Hello World!" is not present in the logged output.
+ suite.waitForAndVerifyOutputFiles();
+
+ AssertWithBackoff asserter4 =
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(120000).backoffFactor(1);
+ asserter4.assertTrue(input -> {
+ //Inspect the zNode at the path corresponding to the Workflow resource.
Ensure the target state of the resource is in
+ // the START state.
+ ZNRecord recordNew = zkClient.readData(zNodePath, true);
+ String targetStateNew = null;
+ if (recordNew != null) {
+ targetStateNew = recordNew.getSimpleField("TargetState");
+ return targetStateNew.equals(TargetState.START.name());
+ }
+ return false;
+ }, "Waiting for Workflow TargetState to be START");
+ }
+
+ private Predicate<Void> isTaskStarted(HelixManager helixManager, String
jobId) {
+ return input -> TaskDriver.getWorkflowContext(helixManager, jobId) != null;
+ }
+
+ private Predicate<Void> isTaskRunning(String taskStateFileName) {
+ return input -> {
+ try {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
Review comment:
can avoid using hadoop fs here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 231462)
Time Spent: 5h 40m (was: 5.5h)
> Support cancellation of a Helix workflow via a DELETE Spec
> ----------------------------------------------------------
>
> Key: GOBBLIN-744
> URL: https://issues.apache.org/jira/browse/GOBBLIN-744
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-cluster
> Affects Versions: 0.15.0
> Reporter: Sudarshan Vasudevan
> Assignee: Hung Tran
> Priority: Major
> Fix For: 0.15.0
>
> Time Spent: 5h 40m
> Remaining Estimate: 0h
>
> This task supports the ability to interrupt and cancel a running job on a
> Gobblin Helix cluster via a DELETE Spec submitted to the
> JobConfigurationManager. The DELETE Spec should have
> "gobblin.cluster.shouldCancelRunningJobOnDelete" set to true for cancelling a
> running job. The default behavior is to simply delete the corresponding
> JobSpec from the JobCatalog.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)