Repository: asterixdb Updated Branches: refs/heads/master 5eb13036d -> 87411c22c
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java new file mode 100644 index 0000000..12c61d6 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime; + +import org.apache.asterix.api.http.server.Duration; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.junit.Assert; +import org.junit.Test; + +public class ParseDurationTest { + + @Test + public void test() throws Exception { + // simple + Assert.assertEquals(0, Duration.parseDurationStringToNanos("0")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(30), + Duration.parseDurationStringToNanos("30s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1478), + Duration.parseDurationStringToNanos("1478s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(-5), + Duration.parseDurationStringToNanos("-5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("+5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0), + Duration.parseDurationStringToNanos("-0")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0), + Duration.parseDurationStringToNanos("+0")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("5.0s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(5) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(600), + Duration.parseDurationStringToNanos("5.6s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("5.s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500), + Duration.parseDurationStringToNanos(".5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1), + Duration.parseDurationStringToNanos("1.0s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1), + Duration.parseDurationStringToNanos("1.00s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(1) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4), + Duration.parseDurationStringToNanos("1.004s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(1) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4), + Duration.parseDurationStringToNanos("1.0040s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(100) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(1), + Duration.parseDurationStringToNanos("100.00100s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(10), + Duration.parseDurationStringToNanos("10ns")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(11), + Duration.parseDurationStringToNanos("11us")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12), + Duration.parseDurationStringToNanos("12µs")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12), + Duration.parseDurationStringToNanos("12μs")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(13), + Duration.parseDurationStringToNanos("13ms")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(14), + Duration.parseDurationStringToNanos("14s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MINUTES.toNanos(15), + Duration.parseDurationStringToNanos("15m")); + Assert.assertEquals(java.util.concurrent.TimeUnit.HOURS.toNanos(16), + Duration.parseDurationStringToNanos("16h")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.HOURS.toNanos(3) + java.util.concurrent.TimeUnit.MINUTES.toNanos(30), + Duration.parseDurationStringToNanos("3h30m")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(10) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500) + + java.util.concurrent.TimeUnit.MINUTES.toNanos(4), + Duration.parseDurationStringToNanos("10.5s4m")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.MINUTES.toNanos(-2) + java.util.concurrent.TimeUnit.SECONDS.toNanos(-3) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(-400), + Duration.parseDurationStringToNanos("-2m3.4s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.HOURS.toNanos(1) + java.util.concurrent.TimeUnit.MINUTES.toNanos(2) + + java.util.concurrent.TimeUnit.SECONDS.toNanos(3) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4) + + java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(5) + + java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(6), + Duration.parseDurationStringToNanos("1h2m3s4ms5us6ns")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.HOURS.toNanos(39) + java.util.concurrent.TimeUnit.MINUTES.toNanos(9) + + java.util.concurrent.TimeUnit.SECONDS.toNanos(14) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(425), + Duration.parseDurationStringToNanos("39h9m14.425s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(52763797000L), + Duration.parseDurationStringToNanos("52763797000ns")); + Assert.assertEquals(1199999998800L, Duration.parseDurationStringToNanos("0.3333333333333333333333h")); + Assert.assertEquals(9007199254740993L, Duration.parseDurationStringToNanos("9007199254740993ns")); + Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775807ns")); + Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775.807us")); + Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036s854ms775us807ns")); + Assert.assertEquals(-9223372036854775807L, Duration.parseDurationStringToNanos("-9223372036854775807ns")); + assertFail(""); + assertFail("3"); + assertFail("-"); + assertFail("s"); + assertFail("."); + assertFail("-."); + assertFail(".s"); + assertFail("+.s"); + assertFail("3000000h"); + assertFail("9223372036854775808ns"); + assertFail("9223372036854775.808us"); + assertFail("9223372036854ms775us808ns"); + assertFail("-9223372036854775808ns"); + } + + @Test + public void testDurationFormatNanos() throws Exception { + Assert.assertEquals("123.456789012s", Duration.formatNanos(123456789012l)); + Assert.assertEquals("12.345678901s", Duration.formatNanos(12345678901l)); + Assert.assertEquals("1.234567890s", Duration.formatNanos(1234567890l)); + Assert.assertEquals("123.456789ms", Duration.formatNanos(123456789l)); + Assert.assertEquals("12.345678ms", Duration.formatNanos(12345678l)); + Assert.assertEquals("1.234567ms", Duration.formatNanos(1234567l)); + Assert.assertEquals("123.456µs", Duration.formatNanos(123456l)); + Assert.assertEquals("12.345µs", Duration.formatNanos(12345l)); + Assert.assertEquals("1.234µs", Duration.formatNanos(1234l)); + Assert.assertEquals("123ns", Duration.formatNanos(123l)); + Assert.assertEquals("12ns", Duration.formatNanos(12l)); + Assert.assertEquals("1ns", Duration.formatNanos(1l)); + Assert.assertEquals("-123.456789012s", Duration.formatNanos(-123456789012l)); + Assert.assertEquals("120.000000000s", Duration.formatNanos(120000000000l)); + Assert.assertEquals("-12ns", Duration.formatNanos(-12l)); + } + + private void assertFail(String duration) { + try { + Duration.parseDurationStringToNanos(duration); + Assert.fail("Expected parseDuration(" + duration + ") to fail but it didn't"); + } catch (HyracksDataException hde) { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 143f7d1..fcfb428 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -68,6 +68,9 @@ public class ErrorCode { public static final int POLYGON_3_POINTS = 25; public static final int POLYGON_INVALID = 26; public static final int OPERATION_NOT_SUPPORTED = 27; + public static final int INVALID_DURATION = 28; + public static final int UNKNOWN_DURATION_UNIT = 29; + public static final int QUERY_TIMEOUT = 30; public static final int INSTANTIATION_ERROR = 100; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 715c27d..5bd5482 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -61,6 +61,9 @@ 25 = Polygon must have at least 3 points 26 = %1$s can not be an instance of polygon 27 = Operation not supported +28 = Invalid duration %1$s +29 = Unknown duration unit %1$s +30 = Query timed out 100 = Unable to instantiate class %1$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-metadata/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml index 5497cf4..458bb67 100644 --- a/asterixdb/asterix-metadata/pom.xml +++ b/asterixdb/asterix-metadata/pom.xml @@ -16,7 +16,8 @@ ! specific language governing permissions and limitations ! under the License. !--> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>apache-asterixdb</artifactId> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index e2868ae..95479c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -130,6 +130,9 @@ public class HyracksClientInterfaceFunctions { public CancelJobFunction(JobId jobId) { this.jobId = jobId; + if (jobId == null) { + throw new IllegalArgumentException("jobId"); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index dbbaf9f..a3078b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -65,6 +65,8 @@ import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker; import org.apache.hyracks.control.cc.work.JobCleanupWork; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; +import org.apache.hyracks.control.common.work.NoOpCallback; +import org.apache.hyracks.control.common.work.IResultCallback; public class JobExecutor { private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName()); @@ -114,9 +116,10 @@ public class JobExecutor { ccs.getContext().notifyJobStart(jobRun.getJobId()); } - public void cancelJob() throws HyracksException { + public void cancelJob(IResultCallback<Void> callback) throws HyracksException { // If the job is already terminated or failed, do nothing here. if (jobRun.getPendingStatus() != null) { + callback.setValue(null); return; } // Sets the cancelled flag. @@ -124,7 +127,8 @@ public class JobExecutor { // Aborts on-ongoing task clusters. abortOngoingTaskClusters(ta -> false, ta -> null); // Aborts the whole job. - abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId()))); + abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())), + callback); } private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots) @@ -196,8 +200,8 @@ public class JobExecutor { "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters); } if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) { - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, null)); + ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, + null, NoOpCallback.INSTANCE)); return; } startRunnableTaskClusters(taskClusterRoots); @@ -520,14 +524,14 @@ public class JobExecutor { } } - public void abortJob(List<Exception> exceptions) { + public void abortJob(List<Exception> exceptions, IResultCallback<Void> callback) { Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters); for (TaskCluster tc : inProgressTaskClustersCopy) { abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED); } assert inProgressTaskClusters.isEmpty(); - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions)); + ccs.getWorkQueue().schedule( + new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions, callback)); } private void abortTaskCluster(TaskClusterAttempt tcAttempt, @@ -686,7 +690,7 @@ public class JobExecutor { + " as failed and the number of max re-attempts = " + maxReattempts); if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) { LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId()); - abortJob(exceptions); + abortJob(exceptions, NoOpCallback.INSTANCE); return; } LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId()); @@ -696,7 +700,7 @@ public class JobExecutor { "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt); } } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE); } } @@ -720,7 +724,7 @@ public class JobExecutor { ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId())); startRunnableActivityClusters(); } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java index 8fe542f..a9ddee3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.common.work.IResultCallback; /** * This interface abstracts the job lifecycle management and job scheduling for a cluster. @@ -47,10 +48,12 @@ public interface IJobManager { /** * Cancel a job with a given job id. * + * @param callback + * * @param jobId, * the id of the job. */ - void cancel(JobId jobId) throws HyracksException; + void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException; /** * This method is called when the master process decides to complete job. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index abf1d57..4ba847d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -46,6 +46,8 @@ import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue; import org.apache.hyracks.control.cc.scheduler.IJobQueue; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.work.NoOpCallback; +import org.apache.hyracks.control.common.work.IResultCallback; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -115,17 +117,14 @@ public class JobManager implements IJobManager { } @Override - public void cancel(JobId jobId) throws HyracksException { - if (jobId == null) { - return; - } + public void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException { // Cancels a running job. if (activeRunMap.containsKey(jobId)) { JobRun jobRun = activeRunMap.get(jobId); // The following call will abort all ongoing tasks and then consequently // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. // Therefore, we do not remove the job out of activeRunMap here. - jobRun.getExecutor().cancelJob(); + jobRun.getExecutor().cancelJob(callback); return; } // Removes a pending job. @@ -138,6 +137,7 @@ public class JobManager implements IJobManager { runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); } + callback.setValue(null); } @Override @@ -322,7 +322,7 @@ public class JobManager implements IJobManager { // fail the job then abort it run.setStatus(JobStatus.FAILURE, exceptions); // abort job will trigger JobCleanupWork - run.getExecutor().abortJob(exceptions); + run.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java index f3b67c9..e3135df 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java @@ -42,10 +42,7 @@ public class CancelJobWork extends SynchronizableWork { @Override protected void doRun() throws Exception { try { - if (jobId != null) { - jobManager.cancel(jobId); - } - callback.setValue(null); + jobManager.cancel(jobId, callback); } catch (Exception e) { callback.setException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index 502ac50..bb85c13 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.common.work.IResultCallback; public class JobCleanupWork extends AbstractWork { private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName()); @@ -37,12 +38,15 @@ public class JobCleanupWork extends AbstractWork { private JobId jobId; private JobStatus status; private List<Exception> exceptions; + private IResultCallback<Void> callback; - public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions) { + public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, + IResultCallback<Void> callback) { this.jobManager = jobManager; this.jobId = jobId; this.status = status; this.exceptions = exceptions; + this.callback = callback; } @Override @@ -53,6 +57,7 @@ public class JobCleanupWork extends AbstractWork { try { JobRun jobRun = jobManager.get(jobId); jobManager.prepareComplete(jobRun, status, exceptions); + callback.setValue(null); } catch (HyracksException e) { // Fail the job with the caught exception during final completion. JobRun run = jobManager.get(jobId); @@ -62,6 +67,7 @@ public class JobCleanupWork extends AbstractWork { } completionException.add(0, e); run.setStatus(JobStatus.FAILURE, completionException); + callback.setException(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index ed2a740..0d46d64 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -48,6 +48,7 @@ import org.apache.hyracks.control.cc.cluster.NodeManager; import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.logs.LogFile; +import org.apache.hyracks.control.common.work.NoOpCallback; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -207,7 +208,7 @@ public class JobManagerTest { } @Test - public void testCancel() throws HyracksException { + public void testCancel() throws Exception { CCConfig ccConfig = new CCConfig(); IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); @@ -247,12 +248,12 @@ public class JobManagerTest { // Cancels deferred jobs. for (JobRun run : deferredRuns) { - jobManager.cancel(run.getJobId()); + jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE); } // Cancels runnable jobs. for (JobRun run : acceptedRuns) { - jobManager.cancel(run.getJobId()); + jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE); } Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java index 150e0e8..ca0c7c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.common.work; public class FutureValue<T> implements IResultCallback<T> { + private boolean done; private T value; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java new file mode 100644 index 0000000..041cee0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.work; + +public class NoOpCallback implements IResultCallback<Void> { + + public static final NoOpCallback INSTANCE = new NoOpCallback(); + + private NoOpCallback() { + } + + @Override + public void setValue(Void result) { + // Dummy is used when no callback is provided + } + + @Override + public void setException(Exception e) { + // Dummy is used when no callback is provided + } + +}
