Repository: asterixdb Updated Branches: refs/heads/master 0e2a7af3a -> 54249a8a9
[ASTERIXDB-2198][CLUS] Introduce NodeJobTracker - user model changes: no - storage format changes: no - interface changes: yes - Add INodeJobTracker to ICcApplicationContext. Details: - Add NodeJobTracker to track each node pending jobs. - Ensure IJobLifecycleListeners are notified about job creation as soon as the job is received by the CC. - Add unit test for NodeJobTracker. Change-Id: Ie5638a6382b0ae0509a2aeeb80dee3db8e7657bb Reviewed-on: https://asterix-gerrit.ics.uci.edu/2220 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/54249a8a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/54249a8a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/54249a8a Branch: refs/heads/master Commit: 54249a8a9fbb2561bcb9cdc9c9f1e4470df11d80 Parents: 0e2a7af Author: Murtadha Hubail <[email protected]> Authored: Wed Dec 13 03:47:14 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Wed Dec 13 09:17:05 2017 -0800 ---------------------------------------------------------------------- .../hyracks/bootstrap/CCApplication.java | 4 + .../asterix/common/api/INodeJobTracker.java | 37 +++++++++ .../common/dataflow/ICcApplicationContext.java | 6 ++ .../runtime/job/listener/NodeJobTracker.java | 81 ++++++++++++++++++++ .../runtime/utils/CcApplicationContext.java | 9 +++ .../job/listener/NodeJobTrackerTest.java | 69 +++++++++++++++++ .../hyracks/control/cc/job/JobManager.java | 6 +- 7 files changed, 208 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 4e75bf7..8283257 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -59,6 +59,7 @@ import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ExternalProperties; @@ -163,6 +164,9 @@ public class CCApplication extends BaseCCApplication { webManager.start(); ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager); ccServiceCtx.addClusterLifecycleListener(new ClusterLifecycleListener(appCtx)); + final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker(); + ccServiceCtx.addJobLifecycleListener(nodeJobTracker); + ccServiceCtx.addClusterLifecycleListener(nodeJobTracker); jobCapacityController = new JobCapacityController(controllerService.getResourceManager()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java new file mode 100644 index 0000000..4503620 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.util.Set; + +import org.apache.hyracks.api.application.IClusterLifecycleListener; +import org.apache.hyracks.api.job.IJobLifecycleListener; +import org.apache.hyracks.api.job.JobId; + +public interface INodeJobTracker extends IJobLifecycleListener, IClusterLifecycleListener { + + /** + * Gets node {@code nodeId} pending jobs. If the node is not active, + * an empty set is returned. + * + * @param nodeId + * @return unmodifiable set of the node pending jobs. + */ + Set<JobId> getPendingJobs(String nodeId); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 3f4d6a6..6181ade 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -20,6 +20,7 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ExtensionProperties; @@ -115,4 +116,9 @@ public interface ICcApplicationContext extends IApplicationContext { * @return the extension properties */ ExtensionProperties getExtensionProperties(); + + /** + * @return the node job tracker + */ + INodeJobTracker getNodeJobTracker(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java new file mode 100644 index 0000000..75c5582 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.job.listener; + +import static org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.asterix.common.api.INodeJobTracker; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.constraints.Constraint; +import org.apache.hyracks.api.constraints.expressions.ConstantExpression; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.util.annotations.ThreadSafe; + +@ThreadSafe +public class NodeJobTracker implements INodeJobTracker { + + private final Map<String, Set<JobId>> nodeJobs = new HashMap<>(); + + @Override + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) { + final Set<String> matchedNodes = spec.getUserConstraints().stream().map(Constraint::getRValue) + .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast) + .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey) + .collect(Collectors.toSet()); + matchedNodes.stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId)); + } + + @Override + public synchronized void notifyJobStart(JobId jobId) { + // nothing to do + } + + @Override + public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) { + nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId)); + } + + @Override + public synchronized void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) { + nodeJobs.computeIfAbsent(nodeId, key -> new HashSet<>()); + } + + @Override + public synchronized void notifyNodeFailure(Collection<String> deadNodeIds) { + deadNodeIds.forEach(nodeJobs::remove); + } + + @Override + public synchronized Set<JobId> getPendingJobs(String nodeId) { + return nodeJobs.containsKey(nodeId) ? + Collections.unmodifiableSet(nodeJobs.get(nodeId)) : + Collections.emptySet(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 63d3d6f..c3ef5b3 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.function.Supplier; import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ActiveProperties; @@ -42,6 +43,7 @@ import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.runtime.job.listener.NodeJobTracker; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.ICCServiceContext; @@ -81,6 +83,7 @@ public class CcApplicationContext implements ICcApplicationContext { private IJobLifecycleListener activeLifeCycleListener; private IMetadataLockManager mdLockManager; private IClusterStateManager clusterStateManager; + private final INodeJobTracker nodeJobTracker; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, @@ -114,6 +117,7 @@ public class CcApplicationContext implements ICcApplicationContext { clusterStateManager = new ClusterStateManager(); clusterStateManager.setCcAppCtx(this); this.resourceIdManager = new ResourceIdManager(clusterStateManager); + nodeJobTracker = new NodeJobTracker(); } @Override @@ -251,4 +255,9 @@ public class CcApplicationContext implements ICcApplicationContext { public IClusterStateManager getClusterStateManager() { return clusterStateManager; } + + @Override + public INodeJobTracker getNodeJobTracker() { + return nodeJobTracker; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java new file mode 100644 index 0000000..186dba8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.job.listener; + +import java.util.Collections; + +import org.apache.hyracks.api.constraints.Constraint; +import org.apache.hyracks.api.constraints.expressions.ConstantExpression; +import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class NodeJobTrackerTest { + + @Test + public void hasPendingJobsTest() { + final String nc1 = "nc1"; + final String nc2 = "nc2"; + final String unknown = "unknown"; + final NodeJobTracker nodeJobTracker = new NodeJobTracker(); + nodeJobTracker.notifyNodeJoin(nc1, null); + nodeJobTracker.notifyNodeJoin(nc2, null); + + JobSpecification jobSpec = new JobSpecification(); + // add nc1 and some other unknown location + final ConstantExpression nc1Location = new ConstantExpression(nc1); + final ConstantExpression unknownLocation = new ConstantExpression(unknown); + final LValueConstraintExpression lValueMock = Mockito.mock(LValueConstraintExpression.class); + jobSpec.getUserConstraints().add(new Constraint(lValueMock, nc1Location)); + jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation)); + + JobId jobId = new JobId(1); + nodeJobTracker.notifyJobCreation(jobId, jobSpec); + // make sure nc1 has a pending job + Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1); + Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty()); + Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty()); + nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null); + // make sure nc1 doesn't have pending jobs anymore + Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty()); + + // make sure node doesn't have pending jobs after failure + jobId = new JobId(2); + nodeJobTracker.notifyJobCreation(jobId, jobSpec); + Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1); + nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1)); + Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/54249a8a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 26f8022..7f1100b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -106,6 +106,8 @@ public class JobManager implements IJobManager { checkJob(jobRun); JobSpecification job = jobRun.getJobSpecification(); IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); + CCServiceContext serviceCtx = ccs.getContext(); + serviceCtx.notifyJobCreation(jobRun.getJobId(), job); switch (status) { case QUEUE: queueJob(jobRun); @@ -304,10 +306,6 @@ public class JobManager implements IJobManager { run.setStartTime(System.currentTimeMillis()); JobId jobId = run.getJobId(); activeRunMap.put(jobId, run); - - CCServiceContext serviceCtx = ccs.getContext(); - JobSpecification spec = run.getJobSpecification(); - serviceCtx.notifyJobCreation(jobId, spec); run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); }
