This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit b5380fc9176f87630079070b3ceb30ac906f397e Author: Peeyush Gupta <[email protected]> AuthorDate: Thu Sep 4 12:44:03 2025 -0700 [NO ISSUE][RT] Notify joblet cleanup if joblet creation fails - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-68387 Change-Id: I13ee5535269c5152a1cd923502a013fa4fad0b40 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20293 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Peeyush Gupta <[email protected]> --- .../nc/work/FailedJobletCreationCleanupWork.java | 61 ++++++++++++++++++++++ .../hyracks/control/nc/work/StartTasksWork.java | 6 ++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/FailedJobletCreationCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/FailedJobletCreationCleanupWork.java new file mode 100644 index 0000000000..7eac920b94 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/FailedJobletCreationCleanupWork.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FailedJobletCreationCleanupWork extends AbstractWork { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final NodeControllerService ncs; + + private final JobId jobId; + + public FailedJobletCreationCleanupWork(NodeControllerService ncs, JobId jobId) { + this.ncs = ncs; + this.jobId = jobId; + } + + @Override + public void run() { + LOGGER.debug("Failed joblet creation cleaning up {}", jobId); + try { + ncs.removeJobParameterByteStore(jobId); + ncs.getClusterController(jobId.getCcId()).notifyJobletCleanup(jobId, ncs.getId()); + } catch (Exception e) { + LOGGER.info(e); + } + } + + @Override + public String toString() { + return getName() + " jobId:" + jobId; + } + + @Override + public Level logLevel() { + return Level.TRACE; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index dd4a956bd3..ae0c16142d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -121,10 +121,11 @@ public class StartTasksWork extends AbstractWork { public void run() { Task task = null; int taskIndex = 0; + Joblet joblet = null; try { ncs.updateMaxJobId(jobId); NCServiceContext serviceCtx = ncs.getContext(); - Joblet joblet = getOrCreateLocalJoblet(deploymentId, serviceCtx, acgBytes); + joblet = getOrCreateLocalJoblet(deploymentId, serviceCtx, acgBytes); if (ncs.getNodeStatus() != NodeStatus.ACTIVE) { throw HyracksException.create(ErrorCode.NODE_IS_NOT_ACTIVE, ncs.getId()); } @@ -202,6 +203,9 @@ public class StartTasksWork extends AbstractWork { ExceptionUtils.setNodeIds(exceptions, ncs.getId()); TaskAttemptId taskId = taskDescriptors.get(taskIndex).getTaskAttemptId(); ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions, jobId, taskId)); + if (joblet == null) { + ncs.getWorkQueue().schedule(new FailedJobletCreationCleanupWork(ncs, jobId)); + } } }
