This is an automated email from the ASF dual-hosted git repository.
dsen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4fde749 [AMBARI-24733] Rolling Restarts: Option to pause, resume and
abort re… (#2424)
4fde749 is described below
commit 4fde749d3f8ff9ab37ec40a487ddc47cb9ab8ce3
Author: Dmitry Sen <[email protected]>
AuthorDate: Fri Oct 12 23:30:44 2018 +0300
[AMBARI-24733] Rolling Restarts: Option to pause, resume and abort re…
(#2424)
* [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort
restarts (dsen) - initial
* [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort
restarts (dsen)
* [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort
restarts (dsen)
* [AMBARI-24682] Rolling Restarts: Option to specify number of failures per
batch (dsen) - added failed task count on resume
* [AMBARI-24733] Rolling Restarts: Option to pause, resume and abort
restarts (dsen) - changes according to review
---
.../api/services/RequestScheduleService.java | 34 ++++
.../internal/RequestScheduleResourceProvider.java | 24 ++-
.../scheduler/AbstractLinearExecutionJob.java | 17 ++
.../server/scheduler/ExecutionScheduleManager.java | 190 ++++++++++++++++++++-
.../server/state/scheduler/BatchRequest.java | 10 ++
.../server/state/scheduler/RequestExecution.java | 10 +-
.../state/scheduler/RequestExecutionImpl.java | 39 +++++
.../scheduler/ExecutionScheduleManagerTest.java | 159 ++++++++++++++++-
.../ambari/server/state/RequestExecutionTest.java | 3 +-
9 files changed, 468 insertions(+), 18 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
index 8c08384..778d575 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
@@ -23,6 +23,7 @@ import java.util.Map;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -41,6 +42,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
@@ -162,6 +164,38 @@ public class RequestScheduleService extends BaseService {
createRequestSchedule(m_clusterName, null));
}
+
+ /**
+ * Handles URL: /clusters/{clusterId}/request_schedules/{requestScheduleId}
+ * Get details on a specific request schedule
+ *
+ * @return
+ */
+
+ @PUT
+ @Path("{requestScheduleId}")
+ @Produces(MediaType.TEXT_PLAIN)
+ @ApiOperation(value = "Updates a scheduled request, usually used to pause
running scheduled requests or to resume them.",
+ notes = "Changes the state of an existing request. Usually used to pause
running scheduled requests or to resume them.",
+ nickname = "RequestSchedules#updateRequestSchedule"
+ )
+ @ApiImplicitParams({
+ @ApiImplicitParam(dataType = REQUEST_SCHEDULE_REQUEST_TYPE, paramType =
PARAM_TYPE_BODY)
+ })
+ @ApiResponses({
+ @ApiResponse(code = HttpStatus.SC_OK, message = MSG_SUCCESSFUL_OPERATION),
+ @ApiResponse(code = HttpStatus.SC_ACCEPTED, message =
MSG_REQUEST_ACCEPTED),
+ @ApiResponse(code = HttpStatus.SC_BAD_REQUEST, message =
MSG_INVALID_ARGUMENTS),
+ @ApiResponse(code = HttpStatus.SC_NOT_FOUND, message =
MSG_RESOURCE_NOT_FOUND),
+ @ApiResponse(code = HttpStatus.SC_UNAUTHORIZED, message =
MSG_NOT_AUTHENTICATED),
+ @ApiResponse(code = HttpStatus.SC_FORBIDDEN, message =
MSG_PERMISSION_DENIED),
+ @ApiResponse(code = HttpStatus.SC_INTERNAL_SERVER_ERROR, message =
MSG_SERVER_ERROR),
+ })
+ public Response updateRequestSchedule(String body, @Context HttpHeaders
headers, @Context UriInfo ui,
+ @ApiParam @PathParam("requestScheduleId")
String requestScheduleId) {
+ return handleRequest(headers, body, ui, Request.Type.PUT,
createRequestSchedule(m_clusterName, requestScheduleId));
+ }
+
/**
* Handles DELETE /clusters/{clusterId}/request_schedules/{requestScheduleId}
* Delete a request schedule
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
index 5e8f849..bde31a5 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
@@ -430,11 +430,22 @@ public class RequestScheduleResourceProvider extends
AbstractControllerResourceP
String username = getManagementController().getAuthName();
Integer userId = getManagementController().getAuthId();
- requestExecution.setBatch(request.getBatch());
- requestExecution.setDescription(request.getDescription());
- requestExecution.setSchedule(request.getSchedule());
- if (request.getStatus() != null && isValidRequestScheduleStatus
- (request.getStatus())) {
+ if (request.getDescription() != null) {
+ requestExecution.setDescription(request.getDescription());
+ }
+
+ if (request.getSchedule() != null) {
+ requestExecution.setSchedule(request.getSchedule());
+ }
+
+ if (request.getStatus() != null) {
+ //TODO status changes graph
+ if (!isValidRequestScheduleStatus(request.getStatus())) {
+ throw new AmbariException("Request Schedule status not valid"
+ + ", clusterName = " + request.getClusterName()
+ + ", description = " + request.getDescription()
+ + ", id = " + request.getId());
+ }
requestExecution.setStatus(RequestExecution.Status.valueOf(request.getStatus()));
}
requestExecution.setUpdateUser(username);
@@ -443,6 +454,7 @@ public class RequestScheduleResourceProvider extends
AbstractControllerResourceP
LOG.info("Persisting updated Request Schedule "
+ ", clusterName = " + request.getClusterName()
+ ", description = " + request.getDescription()
+ + ", status = " + request.getStatus()
+ ", user = " + username);
requestExecution.persist();
@@ -502,7 +514,7 @@ public class RequestScheduleResourceProvider extends
AbstractControllerResourceP
// Setup batch schedule
getManagementController().getExecutionScheduleManager()
- .scheduleBatch(requestExecution);
+ .scheduleAllBatches(requestExecution);
RequestScheduleResponse response = new RequestScheduleResponse
(requestExecution.getId(), requestExecution.getClusterName(),
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index 4599dfa..d99e89e 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -17,6 +17,10 @@
*/
package org.apache.ambari.server.scheduler;
+import static
org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_CLUSTER_NAME_KEY;
+import static
org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_EXECUTION_ID_KEY;
+import static
org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED;
+import static
org.apache.ambari.server.state.scheduler.RequestExecution.Status.PAUSED;
import static org.quartz.DateBuilder.futureDate;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
@@ -125,6 +129,19 @@ public abstract class AbstractLinearExecutionJob
implements ExecutionJob {
return;
}
+ String status = null;
+ try {
+ status =
executionScheduleManager.getBatchRequestStatus(jobDataMap.getLong(BATCH_REQUEST_EXECUTION_ID_KEY),
jobDataMap.getString(BATCH_REQUEST_CLUSTER_NAME_KEY));
+ } catch (AmbariException e) {
+ LOG.warn("Unable to define the status of batch request : ", e);
+ }
+
+ if(ABORTED.name().equals(status) || PAUSED.name().equals(status)) {
+ LOG.info("The linear job chain was paused or aborted, not triggering the
next one");
+ return;
+ }
+
+
int separationSeconds =
jobDataMap.getIntValue(NEXT_EXECUTION_SEPARATION_SECONDS);
Object failedCount =
properties.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY);
Object totalCount =
properties.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY);
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 4f03a3e..3295395 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -18,6 +18,9 @@
package org.apache.ambari.server.scheduler;
+import static
org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED;
+import static
org.apache.ambari.server.state.scheduler.RequestExecution.Status.PAUSED;
+import static
org.apache.ambari.server.state.scheduler.RequestExecution.Status.SCHEDULED;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
@@ -29,6 +32,7 @@ import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.text.ParseException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -47,6 +51,14 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.ActionDBAccessor;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.configuration.Configuration;
+import
org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.internal.RequestImpl;
+import org.apache.ambari.server.controller.internal.RequestResourceProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
import
org.apache.ambari.server.security.authorization.internal.InternalTokenClientFilter;
import
org.apache.ambari.server.security.authorization.internal.InternalTokenStorage;
import org.apache.ambari.server.state.Cluster;
@@ -275,13 +287,42 @@ public class ExecutionScheduleManager {
return true;
}
+
+ private long getFirstJobOrderId(RequestExecution requestExecution) throws
AmbariException {
+ Long firstBatchOrderId = null;
+ Batch batch = requestExecution.getBatch();
+ if (batch != null) {
+ List<BatchRequest> batchRequests = batch.getBatchRequests();
+ if (batchRequests != null) {
+ Collections.sort(batchRequests);
+ ListIterator<BatchRequest> iterator = batchRequests.listIterator();
+ firstBatchOrderId = iterator.next().getOrderId();
+ }
+ }
+ if (firstBatchOrderId == null) {
+ throw new AmbariException("Can't schedule RequestExecution with no
batches");
+ }
+ return firstBatchOrderId;
+ }
+
/**
* Persist jobs based on the request batch and create trigger for the first
* job
* @param requestExecution
* @throws AmbariException
*/
- public void scheduleBatch(RequestExecution requestExecution)
+ public void scheduleAllBatches(RequestExecution requestExecution) throws
AmbariException {
+ Long firstBatchOrderId = getFirstJobOrderId(requestExecution);
+ scheduleBatch(requestExecution, firstBatchOrderId);
+ }
+
+ /**
+ * Persist jobs based on the request batches staring from the defined batch
and create trigger for the first
+ * job
+ * @param requestExecution
+ * @throws AmbariException
+ */
+ public void scheduleBatch(RequestExecution requestExecution, long
startingBatchOrderId)
throws AmbariException {
if (!isSchedulerAvailable()) {
@@ -297,15 +338,18 @@ public class ExecutionScheduleManager {
LOG.error("Unable to determine scheduler state.", e);
throw new AmbariException("Scheduler unavailable.");
}
+ LOG.debug("Scheduling jobs starting from " + startingBatchOrderId);
// Create and persist jobs based on batches
- JobDetail firstJobDetail = persistBatch(requestExecution);
+ JobDetail firstJobDetail = persistBatch(requestExecution,
startingBatchOrderId);
if (firstJobDetail == null) {
throw new AmbariException("Unable to schedule jobs. firstJobDetail = "
+ firstJobDetail);
}
+ Integer failedCount =
countFailedTasksBeforeStartingBatch(requestExecution, startingBatchOrderId);
+
// Create a cron trigger for the first batch job
// If no schedule is specified create simple trigger to fire right away
Schedule schedule = requestExecution.getSchedule();
@@ -332,6 +376,7 @@ public class ExecutionScheduleManager {
.withSchedule(cronSchedule(triggerExpression)
.withMisfireHandlingInstructionFireAndProceed())
.forJob(firstJobDetail)
+ .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY,
failedCount)
.startAt(startDate)
.endAt(endDate)
.build();
@@ -351,6 +396,7 @@ public class ExecutionScheduleManager {
.withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" +
requestExecution.getId(),
ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP)
.withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
+ .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY,
failedCount)
.startNow()
.build();
@@ -364,7 +410,35 @@ public class ExecutionScheduleManager {
}
}
- private JobDetail persistBatch(RequestExecution requestExecution)
+ private Integer countFailedTasksBeforeStartingBatch(RequestExecution
requestExecution, long startingBatchOrderId) throws AmbariException {
+ int result = 0;
+ Batch batch = requestExecution.getBatch();
+ if (batch != null) {
+ List<BatchRequest> batchRequests = batch.getBatchRequests();
+ if (batchRequests != null) {
+ Collections.sort(batchRequests);
+ for (BatchRequest batchRequest : batchRequests) {
+ if (batchRequest.getOrderId() >= startingBatchOrderId) break;
+
+ if (batchRequest.getRequestId() != null) {
+ BatchRequestResponse batchRequestResponse =
getBatchRequestResponse(batchRequest.getRequestId(),
requestExecution.getClusterName());
+ if (batchRequestResponse != null) {
+ result += batchRequestResponse.getFailedTaskCount() +
+ batchRequestResponse.getAbortedTaskCount() +
+ batchRequestResponse.getTimedOutTaskCount();
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Creates and stores the chain of BatchRequestJobs - quartz jobs - in order
from the last order Id to the startingBatchOrderId
+ * @return the quartz job that corresponds to startingBatchOrderId
+ */
+ private JobDetail persistBatch(RequestExecution requestExecution, long
startingBatchOrderId)
throws AmbariException {
Batch batch = requestExecution.getBatch();
@@ -376,7 +450,8 @@ public class ExecutionScheduleManager {
Collections.sort(batchRequests);
ListIterator<BatchRequest> iterator =
batchRequests.listIterator(batchRequests.size());
String nextJobName = null;
- while (iterator.hasPrevious()) {
+ long nextBatchOrderId = batchRequests.size();
+ while (nextBatchOrderId != startingBatchOrderId &&
iterator.hasPrevious()) {
BatchRequest batchRequest = iterator.previous();
String jobName = getJobName(requestExecution.getId(),
@@ -409,6 +484,7 @@ public class ExecutionScheduleManager {
}
nextJobName = jobName;
+ nextBatchOrderId = batchRequest.getOrderId();
}
}
}
@@ -421,14 +497,60 @@ public class ExecutionScheduleManager {
}
/**
- * Delete and re-create all jobs and triggers
- * Update schedule for a batch
+ * Pause/resume/abort request schedule and related jobs and triggers
* @param requestExecution
*/
public void updateBatchSchedule(RequestExecution requestExecution)
throws AmbariException {
+ BatchRequest activeBatch = calculateActiveBatch(requestExecution);
+ if (activeBatch == null) {
+ LOG.warn("Ignoring RequestExecution status update since all batches has
been executed");
+ return;
+ }
+ if (requestExecution.getStatus().equals(SCHEDULED.name())) {
+ scheduleBatch(requestExecution, activeBatch.getOrderId());
+ } else if (requestExecution.getStatus().equals(PAUSED.name()) ||
+ requestExecution.getStatus().equals(ABORTED.name())) {
+ LOG.info("Request execution status changed to " +
requestExecution.getStatus() + " for request schedule "
+ + requestExecution.getId() + ". Deleting related jobs.");
+ deleteJobs(requestExecution, activeBatch.getOrderId());
+ Collection<Long> requestIDsToAbort =
requestExecution.getBatchRequestRequestsIDs(activeBatch.getOrderId());
+ for (Long requestId : requestIDsToAbort) {
+ //might be null if the request is for not long running job
+ if (requestId == null) continue;
+ abortRequestById(requestExecution, requestId);
+ }
+ }
+ }
+
+ /**
+ * Iterate through the batches and find the first one with not completed
status, if all were completed return null
+ * @param requestExecution
+ * @return
+ */
+ private BatchRequest calculateActiveBatch(RequestExecution requestExecution)
{
+ BatchRequest result = null;
+ Batch batch = requestExecution.getBatch();
+ if (batch != null) {
+ List<BatchRequest> batchRequests = batch.getBatchRequests();
+ if (batchRequests != null) {
+ Collections.sort(batchRequests);
+ ListIterator<BatchRequest> iterator = batchRequests.listIterator();
+ do {
+ result = iterator.next();
+ } while (iterator.hasNext() &&
+
HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus()))
&&
+ !HostRoleStatus.ABORTED.name().equals(result.getStatus()));
+ }
+ }
+
+ if (result != null &&
+
HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus()))
&&
+ !HostRoleStatus.ABORTED.name().equals(result.getStatus())) {
+ return null;
+ }
- // TODO: Support delete and update if no jobs are running
+ return result;
}
/**
@@ -483,6 +605,15 @@ public class ExecutionScheduleManager {
* @throws AmbariException
*/
public void deleteAllJobs(RequestExecution requestExecution) throws
AmbariException {
+ Long firstBatchOrderId = getFirstJobOrderId(requestExecution);
+ deleteJobs(requestExecution, firstBatchOrderId);
+ }
+
+ /**
+ * Delete all jobs and triggers if possible.
+ * @throws AmbariException
+ */
+ public void deleteJobs(RequestExecution requestExecution, Long
startingBatchOrderId) throws AmbariException {
if (!isSchedulerAvailable()) {
throw new AmbariException("Scheduler unavailable.");
}
@@ -492,7 +623,11 @@ public class ExecutionScheduleManager {
if (batch != null) {
List<BatchRequest> batchRequests = batch.getBatchRequests();
if (batchRequests != null) {
+ Collections.sort(batchRequests);
for (BatchRequest batchRequest : batchRequests) {
+ //skip all before starting batch
+ if (batchRequest.getOrderId() < startingBatchOrderId) continue;
+
String jobName = getJobName(requestExecution.getId(),
batchRequest.getOrderId());
@@ -540,6 +675,8 @@ public class ExecutionScheduleManager {
actionDBAccessor.setSourceScheduleForRequest(batchRequestResponse.getRequestId(),
executionId);
}
+ batchRequest.setRequestId(batchRequestResponse.getRequestId());
+
return batchRequestResponse.getRequestId();
} catch (Exception e) {
throw new AmbariException("Exception occurred while performing request",
e);
@@ -566,6 +703,33 @@ public class ExecutionScheduleManager {
}
+ protected RequestStatus abortRequestById(RequestExecution requestExecution,
Long requestId) throws AmbariException {
+ LOG.debug("Aborting request " + requestId);
+ ResourceProvider provider =
+
AbstractControllerResourceProvider.getResourceProvider(Resource.Type.Request);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID,
requestExecution.getClusterName());
+ properties.put(RequestResourceProvider.REQUEST_ABORT_REASON_PROPERTY_ID,
"Request execution status changed to " + requestExecution.getStatus());
+ properties.put(RequestResourceProvider.REQUEST_ID_PROPERTY_ID,
Long.toString(requestId));
+ properties.put(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID,
HostRoleStatus.ABORTED.name());
+
+ org.apache.ambari.server.controller.spi.Request request = new
RequestImpl(Collections.emptySet(),
+ Collections.singleton(properties), Collections.emptyMap(), null);
+
+ Predicate predicate = new PredicateBuilder()
+ .property(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID)
+ .equals(requestExecution.getClusterName()).and()
+ .property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID)
+ .equals(Long.toString(requestId)).toPredicate();
+
+ try{
+ return provider.updateResources(request, predicate);
+ } catch (Exception e) {
+ throw new AmbariException("Error while aborting the request.", e);
+ }
+ }
+
private BatchRequestResponse convertToBatchRequestResponse(ClientResponse
clientResponse) {
BatchRequestResponse batchRequestResponse = new BatchRequestResponse();
int retCode = clientResponse.getStatus();
@@ -638,6 +802,18 @@ public class ExecutionScheduleManager {
return batchRequestResponse;
}
+ public String getBatchRequestStatus(Long executionId, String clusterName)
throws AmbariException {
+ Cluster cluster = clusters.getCluster(clusterName);
+ RequestExecution requestExecution =
cluster.getAllRequestExecutions().get(executionId);
+
+ if (requestExecution == null) {
+ throw new AmbariException("Unable to find request schedule with id = "
+ + executionId);
+ }
+
+ return requestExecution.getStatus();
+ }
+
public void updateBatchRequest(long executionId, long batchId, String
clusterName,
BatchRequestResponse batchRequestResponse,
boolean statusOnly) throws AmbariException {
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
index 3063d29..a6d194f 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
@@ -22,6 +22,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
public class BatchRequest implements Comparable<BatchRequest> {
private Long orderId;
+ private Long requestId;
private Type type;
private String uri;
private String body;
@@ -38,6 +39,15 @@ public class BatchRequest implements
Comparable<BatchRequest> {
this.orderId = orderId;
}
+ @JsonProperty("request_id")
+ public Long getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(Long requestId) {
+ this.requestId = requestId;
+ }
+
@JsonProperty("request_type")
public String getType() {
return type.name();
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
index 6d9c40b..f91b6f5 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
@@ -17,6 +17,8 @@
*/
package org.apache.ambari.server.state.scheduler;
+import java.util.Collection;
+
import org.apache.ambari.server.controller.RequestScheduleResponse;
/**
@@ -161,6 +163,10 @@ public interface RequestExecution {
String getRequestBody(Long batchId);
/**
+ * Get the requests IDs for the batch
+ */
+ Collection<Long> getBatchRequestRequestsIDs(long batchId);
+ /**
* Get batch request with specified order id
*/
BatchRequest getBatchRequest(long batchId);
@@ -184,6 +190,8 @@ public interface RequestExecution {
enum Status {
SCHEDULED,
COMPLETED,
- DISABLED
+ DISABLED,
+ ABORTED,
+ PAUSED
}
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 104ca9b..fe18b9a 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -17,14 +17,17 @@
*/
package org.apache.ambari.server.state.scheduler;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.controller.RequestScheduleResponse;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -115,6 +118,10 @@ public class RequestExecutionImpl implements
RequestExecution {
batchRequestEntities) {
BatchRequest batchRequest = new BatchRequest();
batchRequest.setOrderId(batchRequestEntity.getBatchId());
+ batchRequest.setRequestId(batchRequestEntity.getRequestId());
+ if (batchRequestEntity.getRequestBody() != null) {
+ batchRequest.setBody(new
String(batchRequestEntity.getRequestBody()));
+ }
batchRequest.setType(BatchRequest.Type.valueOf(batchRequestEntity.getRequestType()));
batchRequest.setUri(batchRequestEntity.getRequestUri());
batchRequest.setStatus(batchRequestEntity.getRequestStatus());
@@ -133,6 +140,13 @@ public class RequestExecutionImpl implements
RequestExecution {
schedule.setStartTime(requestScheduleEntity.getStartTime());
schedule.setEndTime(requestScheduleEntity.getEndTime());
+ //if all values are nulls set the general scheduler to null
+ if (schedule.getDayOfWeek() == null && schedule.getDaysOfMonth() == null &&
+ schedule.getMinutes() == null && schedule.getHours() == null &&
+ schedule.getMonth() == null && schedule.getYear() == null &&
+ schedule.getStartTime() == null && schedule.getEndTime() == null) {
+ schedule = null;
+ }
isPersisted = true;
}
@@ -276,6 +290,7 @@ public class RequestExecutionImpl implements
RequestExecution {
RequestScheduleBatchRequestEntity batchRequestEntity = new
RequestScheduleBatchRequestEntity();
batchRequestEntity.setBatchId(batchRequest.getOrderId());
+ batchRequestEntity.setRequestId(batchRequest.getRequestId());
batchRequestEntity.setScheduleId(requestScheduleEntity.getScheduleId());
batchRequestEntity.setRequestScheduleEntity(requestScheduleEntity);
batchRequestEntity.setRequestType(batchRequest.getType());
@@ -428,6 +443,22 @@ public class RequestExecutionImpl implements
RequestExecution {
}
@Override
+ public Collection<Long> getBatchRequestRequestsIDs(long batchId) {
+ Collection<Long> requestIDs = new ArrayList<>();
+ if (requestScheduleEntity != null) {
+ Collection<RequestScheduleBatchRequestEntity> requestEntities =
+ requestScheduleEntity.getRequestScheduleBatchRequestEntities();
+ if (requestEntities != null) {
+ requestIDs.addAll(requestEntities.stream()
+ .filter(requestEntity -> requestEntity.getBatchId().equals(batchId))
+ .map(RequestScheduleBatchRequestEntity::getRequestId)
+ .collect(Collectors.toList()));
+ }
+ }
+ return requestIDs;
+ }
+
+ @Override
public BatchRequest getBatchRequest(long batchId) {
for (BatchRequest batchRequest : batch.getBatchRequests()) {
if (batchId == batchRequest.getOrderId()) {
@@ -452,6 +483,14 @@ public class RequestExecutionImpl implements
RequestExecution {
}
}
+ // Rare race condition when batch request finished during pausing the
request execution,
+ //in this case the job details will be deleted,
+ //so we mark it as not completed because otherwise the job detail will be
lost
+ //and the whole Request Execution status will not be set to COMPLETED at
the end.
+ if (Status.PAUSED.name().equals(getStatus()) &&
HostRoleStatus.COMPLETED.name().equals(batchRequestResponse.getStatus())) {
+ batchRequestResponse.setStatus(HostRoleStatus.ABORTED.name());
+ }
+
if (batchRequestEntity != null) {
batchRequestEntity.setRequestStatus(batchRequestResponse.getStatus());
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index 3f4a5d5..3097279 100644
---
a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++
b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertThat;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -216,7 +217,7 @@ public class ExecutionScheduleManagerTest {
RequestExecution requestExecution = createRequestExecution(true);
Assert.assertNotNull(requestExecution);
- executionScheduleManager.scheduleBatch(requestExecution);
+ executionScheduleManager.scheduleAllBatches(requestExecution);
String jobName1 = executionScheduleManager.getJobName(requestExecution
.getId(), 10L);
@@ -280,7 +281,7 @@ public class ExecutionScheduleManagerTest {
RequestExecution requestExecution = createRequestExecution(true);
Assert.assertNotNull(requestExecution);
- executionScheduleManager.scheduleBatch(requestExecution);
+ executionScheduleManager.scheduleAllBatches(requestExecution);
String jobName1 = executionScheduleManager.getJobName(requestExecution
.getId(), 10L);
@@ -308,7 +309,7 @@ public class ExecutionScheduleManagerTest {
RequestExecution requestExecution = createRequestExecution(false);
Assert.assertNotNull(requestExecution);
- executionScheduleManager.scheduleBatch(requestExecution);
+ executionScheduleManager.scheduleAllBatches(requestExecution);
String jobName1 = executionScheduleManager.getJobName(requestExecution
.getId(), 10L);
@@ -387,6 +388,9 @@ public class ExecutionScheduleManagerTest {
expect(batchRequestMock.getUri()).andReturn(uri).once();
expect(batchRequestMock.getType()).andReturn(type).once();
+ batchRequestMock.setRequestId(5L);
+ expectLastCall().once();
+
expect(scheduleManager.performApiRequest(eq(uri), eq(body), eq(type),
eq(userId))).andReturn(batchRequestResponse).once();
scheduleManager.updateBatchRequest(eq(executionId), eq(batchId),
eq(clusterName), eq(batchRequestResponse), eq(false));
@@ -721,4 +725,153 @@ public class ExecutionScheduleManagerTest {
assertEquals("http://localhost:8080/",
scheduleManager.extendApiResource(webResource, "").getURI().toString());
}
+
+ @Test
+ public void testUpdateBatchSchedulePause() throws Exception {
+ Clusters clustersMock = createMock(Clusters.class);
+ Cluster clusterMock = createMock(Cluster.class);
+ Configuration configurationMock = createNiceMock(Configuration.class);
+ ExecutionScheduler executionSchedulerMock =
createMock(ExecutionScheduler.class);
+ InternalTokenStorage tokenStorageMock =
createMock(InternalTokenStorage.class);
+ ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class);
+ Gson gson = new Gson();
+ RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+ Batch batchMock = createMock(Batch.class);
+ JobDetail jobDetailMock = createMock(JobDetail.class);
+ final BatchRequest batchRequestMock1 = createMock(BatchRequest.class);
+ final BatchRequest batchRequestMock2 = createMock(BatchRequest.class);
+ final Trigger triggerMock = createNiceMock(Trigger.class);
+ final List<Trigger> triggers = new ArrayList<Trigger>() {{
add(triggerMock); }};
+
+ long executionId = 11L;
+ String clusterName = "c1";
+ Date pastDate = new Date(new Date().getTime() - 2);
+
+ Map<Long, RequestExecution> executionMap = new HashMap<>();
+ executionMap.put(executionId, requestExecutionMock);
+
+
EasyMock.expect(configurationMock.getApiSSLAuthentication()).andReturn(Boolean.FALSE);
+ EasyMock.replay(configurationMock);
+
+ ExecutionScheduleManager scheduleManager =
+ createMockBuilder(ExecutionScheduleManager.class)
+ .withConstructor(configurationMock, executionSchedulerMock,
tokenStorageMock,
+ clustersMock, actionDBAccessorMock, gson)
+ .addMockedMethods("deleteJobs", "abortRequestById").createMock();
+
+
expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+
expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+ expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+ expect(batchMock.getBatchRequests()).andReturn
+ (new ArrayList<BatchRequest>() {{
+ add(batchRequestMock1);
+ add(batchRequestMock2);
+ }});
+ expect(batchRequestMock1.getOrderId()).andReturn(1L).anyTimes();
+
expect(batchRequestMock1.getStatus()).andReturn(HostRoleStatus.COMPLETED.name()).anyTimes();
+
expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes();
+
expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes();
+ expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes();
+
expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS.name()).anyTimes();
+ expect(executionSchedulerMock.getJobDetail((JobKey) anyObject()))
+ .andReturn(jobDetailMock).anyTimes();
+ expect((List<Trigger>) executionSchedulerMock
+ .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes();
+ expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes();
+ expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes();
+
+
expect(requestExecutionMock.getStatus()).andReturn(RequestExecution.Status.PAUSED.name()).anyTimes();
+ expect(requestExecutionMock.getId()).andReturn(executionId).anyTimes();
+
expect(requestExecutionMock.getBatchRequestRequestsIDs(3L)).andReturn(Collections.singleton(5L)).anyTimes();
+
+ //deletes only second batch, the first was completed
+ scheduleManager.deleteJobs(eq(requestExecutionMock), eq(3L));
+ expectLastCall().once();
+ //second batch request needs to be aborted
+ expect(scheduleManager.abortRequestById(requestExecutionMock,
5L)).andReturn(null).once();
+
+ replay(clustersMock, clusterMock, requestExecutionMock,
+ executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1,
batchRequestMock2,
+ triggerMock, jobDetailMock, actionDBAccessorMock);
+
+ scheduleManager.updateBatchSchedule(requestExecutionMock);
+
+ verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+ executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1,
batchRequestMock2,
+ triggerMock, jobDetailMock, actionDBAccessorMock);
+ }
+
+ @Test
+ public void testUpdateBatchScheduleUnpause() throws Exception {
+ Clusters clustersMock = createMock(Clusters.class);
+ Cluster clusterMock = createMock(Cluster.class);
+ Configuration configurationMock = createNiceMock(Configuration.class);
+ ExecutionScheduler executionSchedulerMock =
createMock(ExecutionScheduler.class);
+ InternalTokenStorage tokenStorageMock =
createMock(InternalTokenStorage.class);
+ ActionDBAccessor actionDBAccessorMock = createMock(ActionDBAccessor.class);
+ Gson gson = new Gson();
+ RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+ Batch batchMock = createMock(Batch.class);
+ JobDetail jobDetailMock = createMock(JobDetail.class);
+ final BatchRequest batchRequestMock1 = createMock(BatchRequest.class);
+ final BatchRequest batchRequestMock2 = createMock(BatchRequest.class);
+ final Trigger triggerMock = createNiceMock(Trigger.class);
+ final List<Trigger> triggers = new ArrayList<Trigger>() {{
add(triggerMock); }};
+
+ long executionId = 11L;
+ String clusterName = "c1";
+ Date pastDate = new Date(new Date().getTime() - 2);
+
+ Map<Long, RequestExecution> executionMap = new HashMap<>();
+ executionMap.put(executionId, requestExecutionMock);
+
+
EasyMock.expect(configurationMock.getApiSSLAuthentication()).andReturn(Boolean.FALSE);
+ EasyMock.replay(configurationMock);
+
+ ExecutionScheduleManager scheduleManager =
+ createMockBuilder(ExecutionScheduleManager.class)
+ .withConstructor(configurationMock, executionSchedulerMock,
tokenStorageMock,
+ clustersMock, actionDBAccessorMock, gson)
+ .addMockedMethods("scheduleBatch").createMock();
+
+
expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+
expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+ expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+ expect(batchMock.getBatchRequests()).andReturn
+ (new ArrayList<BatchRequest>() {{
+ add(batchRequestMock1);
+ add(batchRequestMock2);
+ }});
+ expect(batchRequestMock1.getOrderId()).andReturn(1L).anyTimes();
+
expect(batchRequestMock1.getStatus()).andReturn(HostRoleStatus.FAILED.name()).anyTimes();
+
expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes();
+
expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes();
+ expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes();
+
expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.PENDING.name()).once();
+ expect(executionSchedulerMock.getJobDetail((JobKey) anyObject()))
+ .andReturn(jobDetailMock).anyTimes();
+ expect((List<Trigger>) executionSchedulerMock
+ .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes();
+ expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes();
+ expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes();
+
+
expect(requestExecutionMock.getStatus()).andReturn(RequestExecution.Status.SCHEDULED.name()).anyTimes();
+ expect(requestExecutionMock.getId()).andReturn(executionId).anyTimes();
+
expect(requestExecutionMock.getBatchRequestRequestsIDs(3L)).andReturn(Collections.singleton(5L)).anyTimes();
+
+ //schedule staring from second batch, the first was completed
+ scheduleManager.scheduleBatch(eq(requestExecutionMock), eq(3L));
+ expectLastCall().once();
+
+ replay(clustersMock, clusterMock, requestExecutionMock,
+ executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1,
batchRequestMock2,
+ triggerMock, jobDetailMock, actionDBAccessorMock);
+
+ scheduleManager.updateBatchSchedule(requestExecutionMock);
+
+ verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+ executionSchedulerMock, scheduleManager, batchMock, batchRequestMock1,
batchRequestMock2,
+ triggerMock, jobDetailMock, actionDBAccessorMock);
+ }
+
}
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index bdcdb76..6e30c40 100644
---
a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++
b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -304,7 +304,8 @@ public class RequestExecutionTest {
}
Assert.assertNotNull(postBatchRequest);
// Not read by default
- Assert.assertNull(postBatchRequest.getBody());
+ Assert.assertNotNull(postBatchRequest.getBody());
+ Assert.assertEquals("testBody", postBatchRequest.getBody());
RequestScheduleResponse requestScheduleResponse = requestExecution
.convertToResponseWithBody();