This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 406725a226 [ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is
not ACTIVE
406725a226 is described below
commit 406725a2267d31f23a88aa9d0d51abbc9a22c71c
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed Jul 17 22:26:07 2024 +0300
[ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is not ACTIVE
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Do not run jobs while cluster is not ACTIVE.
- Allow certain jobs to run regardless of cluster state.
Ext-ref: MB-62635
Change-Id: I9a027e46a9067e18e2fd5de57112f0d15addc702
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18488
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Ali Alsuliman <[email protected]>
---
.../asterix/optimizer/base/AnalysisUtil.java | 2 +-
.../asterix/app/active/FeedEventsListener.java | 2 +-
.../asterix/app/translator/QueryTranslator.java | 10 +-
.../asterix/hyracks/bootstrap/CCApplication.java | 17 ++-
.../hyracks/bootstrap/GlobalRecoveryManager.java | 8 --
.../org/apache/asterix/utils/FlushDatasetUtil.java | 2 +-
.../org/apache/asterix/utils/RebalanceUtil.java | 10 +-
.../org/apache/asterix/common/utils/JobUtils.java | 26 ++++-
.../apache/asterix/metadata/entities/Dataset.java | 2 +-
.../job/resource/JobCapacityController.java | 16 ++-
.../job/resource/JobCapacityControllerTest.java | 35 +++++--
.../hyracks/api/application/ICCApplication.java | 8 ++
.../api/client/IHyracksClientConnection.java | 116 ++++++++++-----------
.../api/client/IHyracksClientInterface.java | 4 +-
.../apache/hyracks/api/exceptions/ErrorCode.java | 1 +
.../java/org/apache/hyracks/api/job/JobFlag.java | 3 +-
.../job/resource/DefaultJobCapacityController.java | 6 +-
.../api/job/resource/IJobCapacityController.java | 15 ++-
.../src/main/resources/errormsg/en.properties | 1 +
.../hyracks/control/cc/BaseCCApplication.java | 7 ++
.../apache/hyracks/control/cc/job/JobManager.java | 2 +-
.../hyracks/control/cc/scheduler/FIFOJobQueue.java | 3 +-
.../hyracks/control/cc/job/JobManagerTest.java | 27 +++--
.../AbstractMultiNCIntegrationTest.java | 4 +-
24 files changed, 210 insertions(+), 117 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index bb428fa645..d8775c8741 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -282,7 +282,7 @@ public class AnalysisUtil {
JobSpecification jobSpec = compiler.createJob(appCtx, new
JobEventListenerFactory(newTxnId, false));
- JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true);
+ JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec,
true);
IResultSetReader resultSetReader =
appCtx.getResultSet().createReader(jobId, resultSetId);
FrameManager frameManager = new
FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize());
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 3a81c099d6..4674f7ee4c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -107,7 +107,7 @@ public class FeedEventsListener extends
ActiveEntityEventsListener {
// TODO(Yingyi): currently we do not check IFrameWriter protocol
violations for Feed jobs.
// We will need to design general exception handling mechanism for
feeds.
setLocations(jobInfo.getRight());
- return JobUtils.runJob(hcc, feedJob, false);
+ return JobUtils.runJobIfActive(hcc, feedJob, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ad74d6c4fb..0c736a245b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -302,7 +302,7 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
protected final APIFramework apiFramework;
protected final IRewriterFactory rewriterFactory;
protected final ExecutorService executorService;
- protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
+ protected final EnumSet<JobFlag> jobFlags =
EnumSet.of(JobFlag.ENSURE_RUNNABLE);
protected final IMetadataLockManager lockManager;
protected final IMetadataLockUtil lockUtil;
protected final IResponsePrinter responsePrinter;
@@ -2435,7 +2435,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
requestParameters.isForceDropDataset());
}
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobUtils.runJobIfActive(hcc, jobSpec, true);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to
be compensated.
@@ -4298,7 +4298,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
private static JobId runTrackJob(IHyracksClientConnection hcc,
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
String reqId, String clientCtxId, ClientRequest clientRequest)
throws Exception {
jobSpec.setRequestId(reqId);
- JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId,
reqId, clientCtxId);
clientRequest.setJobId(jobId);
return jobId;
@@ -5407,12 +5407,12 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
private static void runJob(IHyracksClientConnection hcc, JobSpecification
jobSpec, EnumSet<JobFlag> jobFlags)
throws Exception {
- JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+ JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true);
}
private static List<IOperatorStats> runJob(IHyracksClientConnection hcc,
JobSpecification jobSpec,
EnumSet<JobFlag> jobFlags, List<String> statOperatorNames) throws
Exception {
- Pair<JobId, List<IOperatorStats>> p = JobUtils.runJob(hcc, jobSpec,
jobFlags, true, statOperatorNames);
+ Pair<JobId, List<IOperatorStats>> p = JobUtils.runJobIfActive(hcc,
jobSpec, jobFlags, true, statOperatorNames);
return p.second;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 972417443c..0b7e7d0a2c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -22,6 +22,8 @@ package org.apache.asterix.hyracks.bootstrap;
import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
import static
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import static
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
import static
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
import static
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
@@ -35,6 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.http.IQueryWebServerRegistrant;
@@ -69,6 +72,7 @@ import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.api.INamespaceResolver;
import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.cluster.IGlobalTxManager;
import org.apache.asterix.common.config.AsterixExtension;
@@ -111,6 +115,7 @@ import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.result.IJobResultCallback;
@@ -212,7 +217,7 @@ public class CCApplication extends BaseCCApplication {
ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
ccServiceCtx.addJobLifecycleListener(globalTxManager);
- jobCapacityController = new
JobCapacityController(controllerService.getResourceManager());
+ jobCapacityController = new
JobCapacityController(controllerService.getResourceManager(), this);
}
protected INamespaceResolver createNamespaceResolver(boolean
useDatabaseResolution) {
@@ -441,4 +446,14 @@ public class CCApplication extends BaseCCApplication {
public IJobResultCallback getJobResultCallback() {
return new JobResultCallback(appCtx);
}
+
+ @Override
+ public boolean acceptingJobs(Set<JobFlag> flags) {
+ // flags == null should not be needed since currently it's not null
(but not enforced)
+ if (flags == null || !flags.contains(JobFlag.ENSURE_RUNNABLE)) {
+ return true;
+ }
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ return csm.getState() == ACTIVE || csm.getState() ==
REBALANCE_REQUIRED;
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ed9d0c6f4c..08972d450d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -40,8 +40,6 @@ import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -75,12 +73,6 @@ public class GlobalRecoveryManager implements
IGlobalRecoveryManager {
return Collections.emptySet();
}
- private void executeHyracksJob(JobSpecification spec) throws Exception {
- spec.setMaxReattempts(0);
- JobId jobId = hcc.startJob(spec);
- hcc.waitForCompletion(jobId);
- }
-
@Override
public void startGlobalRecovery(ICcApplicationContext appCtx) {
if (!recoveryCompleted && !recovering) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index f38c18b705..1c17704ff6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -75,7 +75,7 @@ public class FlushDatasetUtil {
JobEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(txnId, true);
spec.setJobletEventListenerFactory(jobEventListenerFactory);
- JobUtils.runJob(hcc, spec, true);
+ JobUtils.runJobIfActive(hcc, spec, true);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 180b6ccd45..cc9f1ada72 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -305,7 +305,7 @@ public class RebalanceUtil {
private static void createRebalanceTarget(Dataset target, MetadataProvider
metadataProvider,
IHyracksClientConnection hcc) throws Exception {
JobSpecification spec = DatasetUtil.createDatasetJobSpec(target,
metadataProvider);
- JobUtils.runJob(hcc, spec, true);
+ JobUtils.forceRunJob(hcc, spec, true);
}
// Populates the data from the source dataset to the rebalance target
dataset.
@@ -348,7 +348,7 @@ public class RebalanceUtil {
spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0,
commitOp, 0);
// Executes the job.
- JobUtils.runJob(hcc, spec, true);
+ JobUtils.forceRunJob(hcc, spec, true);
}
private static ITupleProjectorFactory createTupleProjectorFactory(Dataset
source, MetadataProvider metadataProvider)
@@ -403,7 +403,7 @@ public class RebalanceUtil {
EnumSet.of(DropOption.IF_EXISTS,
DropOption.WAIT_ON_IN_USE), null));
}
for (JobSpecification jobSpec : jobs) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobUtils.forceRunJob(hcc, jobSpec, true);
}
}
@@ -427,12 +427,12 @@ public class RebalanceUtil {
// Creates the secondary index.
JobSpecification indexCreationJobSpec =
IndexUtil.buildSecondaryIndexCreationJobSpec(target,
index, metadataProvider, null);
- JobUtils.runJob(hcc, indexCreationJobSpec, true);
+ JobUtils.forceRunJob(hcc, indexCreationJobSpec, true);
// Loads the secondary index.
JobSpecification indexLoadingJobSpec =
IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index,
metadataProvider, null);
- JobUtils.runJob(hcc, indexLoadingJobSpec, true);
+ JobUtils.forceRunJob(hcc, indexLoadingJobSpec, true);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index c1c6f18d1f..e3f57f3b4a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -36,12 +36,28 @@ public class JobUtils {
ADDED_PENDINGOP_RECORD_TO_METADATA
}
- public static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec, boolean waitForCompletion)
+ public static JobId forceRunJob(IHyracksClientConnection hcc,
JobSpecification spec, boolean waitForCompletion)
throws Exception {
return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class),
waitForCompletion);
}
- public static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec, EnumSet<JobFlag> jobFlags,
+ public static JobId runJobIfActive(IHyracksClientConnection hcc,
JobSpecification spec, boolean waitForCompletion)
+ throws Exception {
+ return runJob(hcc, spec, EnumSet.of(JobFlag.ENSURE_RUNNABLE),
waitForCompletion);
+ }
+
+ public static JobId runJobIfActive(IHyracksClientConnection hcc,
JobSpecification spec, EnumSet<JobFlag> jobFlags,
+ boolean waitForCompletion) throws Exception {
+ if (jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+ return runJob(hcc, spec, jobFlags, waitForCompletion);
+ } else {
+ EnumSet<JobFlag> flags = EnumSet.copyOf(jobFlags);
+ flags.add(JobFlag.ENSURE_RUNNABLE);
+ return runJob(hcc, spec, flags, waitForCompletion);
+ }
+ }
+
+ private static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec, EnumSet<JobFlag> jobFlags,
boolean waitForCompletion) throws Exception {
spec.setMaxReattempts(0);
final JobId jobId = hcc.startJob(spec, jobFlags);
@@ -57,8 +73,12 @@ public class JobUtils {
return jobId;
}
- public static Pair<JobId, List<IOperatorStats>>
runJob(IHyracksClientConnection hcc, JobSpecification spec,
+ public static Pair<JobId, List<IOperatorStats>>
runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec,
EnumSet<JobFlag> jobFlags, boolean waitForCompletion, List<String>
statOperatorNames) throws Exception {
+ if (!jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+ jobFlags = EnumSet.copyOf(jobFlags);
+ jobFlags.add(JobFlag.ENSURE_RUNNABLE);
+ }
spec.setMaxReattempts(0);
final JobId jobId = hcc.startJob(spec, jobFlags);
List<IOperatorStats> opStats = null;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9aa5d84bba..132efbcded 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -403,7 +403,7 @@ public class Dataset implements IMetadataEntity<Dataset>,
IDataset {
// #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobUtils.runJobIfActive(hcc, jobSpec, true);
}
mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index ae903d1f71..236056ca02 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -19,8 +19,14 @@
package org.apache.asterix.runtime.job.resource;
+import java.util.Set;
+
+import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
@@ -36,13 +42,19 @@ public class JobCapacityController implements
IJobCapacityController {
private static final Logger LOGGER = LogManager.getLogger();
private final IResourceManager resourceManager;
+ private final ICCApplication ccApp;
- public JobCapacityController(IResourceManager resourceManager) {
+ public JobCapacityController(IResourceManager resourceManager,
ICCApplication ccApp) {
this.resourceManager = resourceManager;
+ this.ccApp = ccApp;
}
@Override
- public JobSubmissionStatus allocate(JobSpecification job) throws
HyracksException {
+ public JobSubmissionStatus allocate(JobSpecification job, JobId jobId,
Set<JobFlag> jobFlags)
+ throws HyracksException {
+ if (!ccApp.acceptingJobs(jobFlags)) {
+ throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job);
+ }
IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
long reqAggregatedMemoryByteSize =
requiredCapacity.getAggregatedMemoryByteSize();
int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
diff --git
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
index 48c61b480e..e70b306919 100644
---
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
+++
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
@@ -22,8 +22,13 @@ package org.apache.asterix.runtime.job.resource;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -36,31 +41,34 @@ import org.junit.Test;
public class JobCapacityControllerTest {
+ private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
@Test
public void test() throws HyracksException {
+ JobId jobId = new JobId(0);
IResourceManager resourceManager =
makeResourceManagerWithCapacity(4294967296L, 33);
- JobCapacityController capacityController = new
JobCapacityController(resourceManager);
+ JobCapacityController capacityController = new
JobCapacityController(resourceManager, makeCCApp());
// Verifies the correctness of the allocate method.
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(4294967296L, 16)) ==
IJobCapacityController.JobSubmissionStatus.EXECUTE);
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(2147483648L, 16)) ==
IJobCapacityController.JobSubmissionStatus.QUEUE);
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(2147483648L, 32)) ==
IJobCapacityController.JobSubmissionStatus.QUEUE);
+
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L,
16), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L,
16), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L,
32), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
boolean exceedCapacity = false;
try {
-
capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
+
capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64),
jobId, none);
} catch (HyracksException e) {
exceedCapacity =
e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
}
Assert.assertTrue(exceedCapacity);
- Assert.assertTrue(capacityController.allocate(
- makeJobWithRequiredCapacity(4294967296L, 32)) ==
IJobCapacityController.JobSubmissionStatus.QUEUE);
+
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L,
32), jobId,
+ none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
exceedCapacity = false;
try {
-
capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
+
capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33),
jobId, none);
} catch (HyracksException e) {
exceedCapacity =
e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
}
@@ -95,4 +103,9 @@ public class JobCapacityControllerTest {
return clusterCapacity;
}
+ private ICCApplication makeCCApp() {
+ ICCApplication ccApp = mock(ICCApplication.class);
+ when(ccApp.acceptingJobs(none)).thenReturn(true);
+ return ccApp;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index 5cc8f69085..6c2238640b 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.api.application;
+import java.util.Set;
+
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
@@ -37,4 +40,9 @@ public interface ICCApplication extends IApplication {
* @return the job result callback
*/
IJobResultCallback getJobResultCallback();
+
+ /**
+ * @return true if the application is accepting jobs. False, otherwise.
+ */
+ boolean acceptingJobs(Set<JobFlag> jobFlags);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index be965ebc2f..66f4fab779 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -89,55 +89,93 @@ public interface IHyracksClientConnection extends
IClusterInfoCollector {
JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws
Exception;
/**
- * Distribute the specified Job.
+ * Used to run a deployed Job Spec by id
*
+ * @param deployedJobSpecId
+ * The id of the deployed job spec
+ * @param jobParameters
+ * The serialized job parameters
+ * @throws Exception
+ */
+ JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]>
jobParameters) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
* @param jobSpec
* Job Specification
* @throws Exception
*/
- DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
+ JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws
Exception;
/**
- * Update the JobSpec for a deployed job.
+ * Start the specified Job.
*
- * @param deployedJobSpecId
- * The id of the deployed job spec
+ * @param deploymentId
+ * the id of the specific deployment
* @param jobSpec
* Job Specification
+ * @param jobFlags
+ * Flags
* @throws Exception
*/
- void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification
jobSpec) throws Exception;
+ JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec,
EnumSet<JobFlag> jobFlags) throws Exception;
/**
- * Remove the deployed Job Spec
+ * Start the specified Job.
*
- * @param deployedJobSpecId
- * The id of the deployed job spec
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
* @throws Exception
*/
- void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+ JobId startJob(DeploymentId deploymentId,
IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception;
/**
- * Used to run a deployed Job Spec by id
+ * Distribute the specified Job.
+ *
+ * @param jobSpec
+ * Job Specification
+ * @throws Exception
+ */
+ DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
+
+ /**
+ * Update the JobSpec for a deployed job.
*
* @param deployedJobSpecId
* The id of the deployed job spec
- * @param jobParameters
- * The serialized job parameters
+ * @param jobSpec
+ * Job Specification
* @throws Exception
*/
- JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]>
jobParameters) throws Exception;
+ void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification
jobSpec) throws Exception;
/**
- * Start the specified Job.
+ * Remove the deployed Job Spec
*
- * @param acggf
- * Activity Cluster Graph Generator Factory
- * @param jobFlags
- * Flags
+ * @param deployedJobSpecId
+ * The id of the deployed job spec
* @throws Exception
*/
- JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception;
+ void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
/**
* Gets the IP Address and port for the ResultDirectoryService wrapped in
NetworkAddress
@@ -195,44 +233,6 @@ public interface IHyracksClientConnection extends
IClusterInfoCollector {
*/
void unDeployBinary(DeploymentId deploymentId) throws Exception;
- /**
- * Start the specified Job.
- *
- * @param deploymentId
- * the id of the specific deployment
- * @param jobSpec
- * Job Specification
- * @throws Exception
- */
- JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws
Exception;
-
- /**
- * Start the specified Job.
- *
- * @param deploymentId
- * the id of the specific deployment
- * @param jobSpec
- * Job Specification
- * @param jobFlags
- * Flags
- * @throws Exception
- */
- JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec,
EnumSet<JobFlag> jobFlags) throws Exception;
-
- /**
- * Start the specified Job.
- *
- * @param deploymentId
- * the id of the specific deployment
- * @param acggf
- * Activity Cluster Graph Generator Factory
- * @param jobFlags
- * Flags
- * @throws Exception
- */
- JobId startJob(DeploymentId deploymentId,
IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
- throws Exception;
-
/**
* Shuts down all NCs and then the CC.
*
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index be470146f6..f63e0ea129 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -42,6 +42,8 @@ public interface IHyracksClientInterface {
public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[],
byte[]> jobParameters) throws Exception;
+ public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes,
EnumSet<JobFlag> jobFlags) throws Exception;
+
public void cancelJob(JobId jobId) throws Exception;
public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
@@ -64,8 +66,6 @@ public interface IHyracksClientInterface {
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
- public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes,
EnumSet<JobFlag> jobFlags) throws Exception;
-
public JobInfo getJobInfo(JobId jobId) throws Exception;
public void stopCluster(boolean terminateNCService) throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e46c0ef490..766fb26e57 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -156,6 +156,7 @@ public enum ErrorCode implements IError {
ILLEGAL_STATE(126),
INVALID_STRING_UNICODE(127),
UNSUPPORTED_WRITE_SPEC(128),
+ JOB_REJECTED(129),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 7225cd4964..93848a1ff3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -20,5 +20,6 @@ package org.apache.hyracks.api.job;
public enum JobFlag {
PROFILE_RUNTIME,
- ENFORCE_CONTRACT
+ ENFORCE_CONTRACT,
+ ENSURE_RUNNABLE
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index b18bcb10ee..0bfdb370ca 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -19,6 +19,10 @@
package org.apache.hyracks.api.job.resource;
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
public class DefaultJobCapacityController implements IJobCapacityController {
@@ -34,7 +38,7 @@ public class DefaultJobCapacityController implements
IJobCapacityController {
}
@Override
- public JobSubmissionStatus allocate(JobSpecification job) {
+ public JobSubmissionStatus allocate(JobSpecification job, JobId jobId,
Set<JobFlag> jobFlags) {
return JobSubmissionStatus.EXECUTE;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index f88baa2ee9..f2c03f6167 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -19,7 +19,11 @@
package org.apache.hyracks.api.job.resource;
+import java.util.Set;
+
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
/**
@@ -41,13 +45,14 @@ public interface IJobCapacityController {
* Allocates required cluster capacity for a job.
*
* @param job,
- * the job specification.
- * @return EXECUTE, if the job can be executed immediately;
- * QUEUE, if the job cannot be executed
+ * the job specification.
+ * @param jobId
+ * the job id.
+ * @return EXECUTE, if the job can be executed immediately; QUEUE, if the
job cannot be executed
* @throws HyracksException
- * if the job's capacity requirement exceeds the maximum
capacity of the cluster.
+ * if the job's capacity requirement exceeds the maximum capacity
of the cluster.
*/
- JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+ JobSubmissionStatus allocate(JobSpecification job, JobId jobId,
Set<JobFlag> jobFlags) throws HyracksException;
/**
* Releases cluster capacity for a job when it completes.
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b3c2d7b02d..fa52bc605e 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -146,6 +146,7 @@
126 = Illegal state. %1$s
127 = Decoding error - %1$s
128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
+129 = Job %1$s not run. Cluster is not accepting jobs
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index efd42c9c59..22e4bbafa9 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.cc;
import java.util.Arrays;
+import java.util.Set;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -26,6 +27,7 @@ import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
@@ -125,4 +127,9 @@ public class BaseCCApplication implements ICCApplication {
// no op
};
}
+
+ @Override
+ public boolean acceptingJobs(Set<JobFlag> flags) {
+ return true;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index be3daae894..7c7111f490 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -123,7 +123,7 @@ public class JobManager implements IJobManager {
JobSpecification job = jobRun.getJobSpecification();
IJobCapacityController.JobSubmissionStatus status;
try {
- status = jobCapacityController.allocate(job);
+ status = jobCapacityController.allocate(job, jobRun.getJobId(),
jobRun.getFlags());
CCServiceContext serviceCtx = ccs.getContext();
serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
switch (status) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 38277c2af7..d0038535bb 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -90,7 +90,8 @@ public class FIFOJobQueue implements IJobQueue {
// Cluster maximum capacity can change over time, thus we have to
re-check if the job should be rejected
// or not.
try {
- IJobCapacityController.JobSubmissionStatus status =
jobCapacityController.allocate(job);
+ IJobCapacityController.JobSubmissionStatus status =
+ jobCapacityController.allocate(job, run.getJobId(),
run.getFlags());
// Checks if the job can be executed immediately.
if (status ==
IJobCapacityController.JobSubmissionStatus.EXECUTE) {
jobRuns.add(run);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 19340d0928..3e87bb3cf1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -30,12 +30,14 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
@@ -57,6 +59,8 @@ import org.mockito.Mockito;
public class JobManagerTest {
+ private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
private CCConfig ccConfig;
@Before
@@ -77,7 +81,8 @@ public class JobManagerTest {
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
-
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
acceptedRuns.add(run);
@@ -93,7 +98,8 @@ public class JobManagerTest {
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
-
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+
.thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
@@ -109,7 +115,8 @@ public class JobManagerTest {
JobRun run = mockJobRun(8193);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
-
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+
.thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
jobManager.add(run);
} catch (HyracksException e) {
@@ -149,7 +156,7 @@ public class JobManagerTest {
JobRun run = mockJobRun(1);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
- when(jobCapacityController.allocate(job))
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY,
"1", "0"));
jobManager.add(run);
} catch (HyracksException e) {
@@ -172,14 +179,16 @@ public class JobManagerTest {
JobRun run1 = mockJobRun(1);
JobSpecification job1 = mock(JobSpecification.class);
when(run1.getJobSpecification()).thenReturn(job1);
-
when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ when(jobCapacityController.allocate(job1, run1.getJobId(), none))
+
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
jobManager.add(run1);
// A failure run.
JobRun run2 = mockJobRun(2);
JobSpecification job2 = mock(JobSpecification.class);
when(run2.getJobSpecification()).thenReturn(job2);
-
when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job2, run2.getJobId(), none))
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY,
"1", "0"));
jobManager.add(run2);
@@ -220,7 +229,8 @@ public class JobManagerTest {
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
-
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
acceptedRuns.add(run);
@@ -236,7 +246,8 @@ public class JobManagerTest {
JobRun run = mockJobRun(id);
JobSpecification job = mock(JobSpecification.class);
when(run.getJobSpecification()).thenReturn(job);
-
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ when(jobCapacityController.allocate(job, run.getJobId(), none))
+
.thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
// Submits the job.
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 7a75a0fcca..97e15337b9 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -247,7 +248,8 @@ public abstract class AbstractMultiNCIntegrationTest {
private long maxRAM = Runtime.getRuntime().maxMemory();
@Override
- public JobSubmissionStatus allocate(JobSpecification job)
throws HyracksException {
+ public JobSubmissionStatus allocate(JobSpecification job,
JobId jobId, Set<JobFlag> jobFlags)
+ throws HyracksException {
return maxRAM >
job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
? JobSubmissionStatus.EXECUTE :
JobSubmissionStatus.QUEUE;
}