Repository: asterixdb Updated Branches: refs/heads/master 486c4cdd2 -> d6ca4b56e
Cancel the on-going job if waitForCompletion is interrupted. Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1825 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d6ca4b56 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d6ca4b56 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d6ca4b56 Branch: refs/heads/master Commit: d6ca4b56ebcbdee8e87f89448c696847a9b05582 Parents: 486c4cd Author: Yingyi Bu <[email protected]> Authored: Fri Jun 9 20:41:49 2017 -0700 Committer: Yingyi Bu <[email protected]> Committed: Fri Jun 9 23:35:38 2017 -0700 ---------------------------------------------------------------------- .../hyracks/api/client/HyracksConnection.java | 8 +++- .../AbstractMultiNCIntegrationTest.java | 24 ++++-------- .../tests/integration/CancelJobTest.java | 41 ++++++++++++++++++++ 3 files changed, 56 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d6ca4b56/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 4b3aff2..ad54110 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -138,7 +138,13 @@ public final class HyracksConnection implements IHyracksClientConnection { @Override public void waitForCompletion(JobId jobId) throws Exception { - hci.waitForCompletion(jobId); + try { + hci.waitForCompletion(jobId); + } catch (InterruptedException e) { + // Cancels an on-going job if the current thread gets interrupted. + hci.cancelJob(jobId); + throw e; + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d6ca4b56/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 148d4f5..05a7e2d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -40,6 +38,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.hyracks.control.cc.BaseCCApplication; @@ -52,8 +51,9 @@ import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; public abstract class AbstractMultiNCIntegrationTest { @@ -70,9 +70,6 @@ public abstract class AbstractMultiNCIntegrationTest { private final List<File> outputFiles; - @Rule - public TemporaryFolder outputFolder = new TemporaryFolder(); - public AbstractMultiNCIntegrationTest() { outputFiles = new ArrayList<>(); } @@ -133,6 +130,10 @@ public abstract class AbstractMultiNCIntegrationTest { hcc.waitForCompletion(jobId); } + protected JobStatus getJobStatus(JobId jobId) throws Exception { + return hcc.getJobStatus(jobId); + } + protected void cancelJob(JobId jobId) throws Exception { hcc.cancelJob(jobId); } @@ -207,15 +208,6 @@ public abstract class AbstractMultiNCIntegrationTest { } } - protected File createTempFile() throws IOException { - File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot()); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Output file: " + tempFile.getAbsolutePath()); - } - outputFiles.add(tempFile); - return tempFile; - } - public static class DummyApplication extends BaseCCApplication { @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d6ca4b56/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java index 7c3b66f..7eba9e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; @@ -61,6 +62,14 @@ import org.junit.Test; public class CancelJobTest extends AbstractMultiNCIntegrationTest { @Test + public void interruptJobClientAfterWaitForCompletion() throws Exception { + // Interrupts the job client after waitForCompletion() is called. + for (JobSpecification spec : testJobs()) { + interruptAfterWaitForCompletion(spec); + } + } + + @Test public void cancelExecutingJobAfterWaitForCompletion() throws Exception { //Cancels executing jobs after waitForCompletion() is called. for (JobSpecification spec : testJobs()) { @@ -167,6 +176,38 @@ public class CancelJobTest extends AbstractMultiNCIntegrationTest { } } + private void interruptAfterWaitForCompletion(JobSpecification spec) throws Exception { + // Submits the job + final JobId jobIdForInterruptTest = startJob(spec); + + // Waits for completion in anther thread + Thread thread = new Thread(() -> { + try { + waitForCompletion(jobIdForInterruptTest); + } catch (Exception e) { + Assert.assertTrue(e instanceof InterruptedException); + } + }); + thread.start(); + + // Interrupts the wait-for-completion thread. + thread.interrupt(); + + // Waits until the thread terminates. + thread.join(); + + // Verifies the job status. + JobStatus jobStatus = getJobStatus(jobIdForInterruptTest); + while (jobStatus == JobStatus.RUNNING) { + synchronized (this) { + // Since job cancellation is asynchronous on NCs, we have to wait there. + wait(1000); + } + jobStatus = getJobStatus(jobIdForInterruptTest); + } + Assert.assertTrue(jobStatus == JobStatus.FAILURE); + } + private void cancelWithoutWait(JobSpecification spec) throws Exception { JobId jobId = startJob(spec); cancelJob(jobId);
