AMBARI-7317. Add "exclusive" flag to custom command/custom action requests (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/64577264 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/64577264 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/64577264 Branch: refs/heads/branch-alerts-dev Commit: 645772647867574620d453ca8b8bb7693a1910eb Parents: 6b2dd66 Author: Lisnichenko Dmitro <[email protected]> Authored: Thu Sep 11 15:08:58 2014 +0300 Committer: Lisnichenko Dmitro <[email protected]> Committed: Tue Sep 16 16:59:21 2014 +0300 ---------------------------------------------------------------------- .../server/actionmanager/ActionScheduler.java | 26 +- .../ambari/server/actionmanager/Request.java | 23 +- .../server/controller/ExecuteActionRequest.java | 20 +- .../controller/MaintenanceStateHelper.java | 2 +- .../internal/RequestResourceProvider.java | 12 +- .../server/orm/entities/RequestEntity.java | 12 + .../server/upgrade/UpgradeCatalog170.java | 4 + .../main/resources/Ambari-DDL-MySQL-CREATE.sql | 2 +- .../main/resources/Ambari-DDL-Oracle-CREATE.sql | 2 +- .../resources/Ambari-DDL-Postgres-CREATE.sql | 2 +- .../Ambari-DDL-Postgres-EMBEDDED-CREATE.sql | 2 +- .../src/main/resources/properties.json | 1 + .../actionmanager/TestActionDBAccessorImpl.java | 2 +- .../actionmanager/TestActionScheduler.java | 264 ++++++++++++++++++- .../AmbariCustomCommandExecutionHelperTest.java | 2 +- .../AmbariManagementControllerTest.java | 86 +++--- .../BackgroundCustomCommandExecutionTest.java | 2 +- 17 files changed, 396 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 0385686..81fee75 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -217,15 +217,27 @@ class ActionScheduler implements Runnable { } int i_stage = 0; - stages = filterParallelPerHostStages(stages); - + + boolean exclusiveRequestIsGoing = false; + // This loop greatly depends on the fact that order of stages in + // a list does not change between invocations for (Stage s : stages) { // Check if we can process this stage in parallel with another stages i_stage ++; - long requestId = s.getRequestId(); LOG.debug("==> STAGE_i = " + i_stage + "(requestId=" + requestId + ",StageId=" + s.getStageId() + ")"); + Request request = db.getRequest(requestId); + + if (request.isExclusive()) { + if (runningRequestIds.size() > 0 ) { + // As a result, we will wait until any previous stages are finished + LOG.debug("Stage requires exclusive execution, but other requests are already executing. Stopping for now"); + break; + } + exclusiveRequestIsGoing = true; + } + if (runningRequestIds.contains(requestId)) { // We don't want to process different stages from the same request in parallel LOG.debug("==> We don't want to process different stages from the same request in parallel" ); @@ -238,8 +250,6 @@ class ActionScheduler implements Runnable { } } - - // Commands that will be scheduled in current scheduler wakeup List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>(); Map<String, RoleStats> roleStats = processInProgressStage(s, commandsToSchedule); @@ -344,6 +354,12 @@ class ActionScheduler implements Runnable { if (! configuration.getParallelStageExecution()) { // If disabled return; } + + if (exclusiveRequestIsGoing) { + // As a result, we will prevent any further stages from being executed + LOG.debug("Stage requires exclusive execution, skipping all executing any further stages"); + break; + } } requestsInProgress.retainAll(runningRequestIds); http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java index 03c4d2e..09633c7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.Gson; -import com.google.inject.Injector; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; @@ -53,6 +52,15 @@ public class Request { private long createTime; private long startTime; private long endTime; + + /** + * If true, this request can not be executed in parallel with any another + * requests. That is useful when updating MM state, performing + * decommission etc. + * Defaults to false. + */ + private boolean exclusive; + /** * As of now, this field is not used. Request status is * calculated at RequestResourceProvider on the fly. @@ -75,6 +83,7 @@ public class Request { this.createTime = System.currentTimeMillis(); this.startTime = -1; this.endTime = -1; + this.exclusive = false; if (-1L != this.clusterId) { try { @@ -111,6 +120,7 @@ public class Request { this.startTime = -1; this.endTime = -1; this.requestType = RequestType.INTERNAL_REQUEST; + this.exclusive = false; } else { String message = "Attempted to construct request from empty stage collection"; LOG.error(message); @@ -132,6 +142,7 @@ public class Request { this.inputs = gson.toJson(actionRequest.getParameters()); this.requestType = actionRequest.isCommand() ? RequestType.COMMAND : RequestType.ACTION; this.commandName = actionRequest.isCommand() ? actionRequest.getCommandName() : actionRequest.getActionName(); + this.exclusive = actionRequest.isExclusive(); } } @@ -159,6 +170,7 @@ public class Request { this.createTime = entity.getCreateTime(); this.startTime = entity.getStartTime(); this.endTime = entity.getEndTime(); + this.exclusive = entity.isExclusive(); this.requestContext = entity.getRequestContext(); this.inputs = entity.getInputs(); @@ -229,6 +241,7 @@ public class Request { requestEntity.setCreateTime(createTime); requestEntity.setStartTime(startTime); requestEntity.setEndTime(endTime); + requestEntity.setExclusive(exclusive); requestEntity.setRequestContext(requestContext); requestEntity.setInputs(inputs); requestEntity.setRequestType(requestType); @@ -384,4 +397,12 @@ public class Request { public void setStatus(HostRoleStatus status) { this.status = status; } + + public boolean isExclusive() { + return exclusive; + } + + public void setExclusive(boolean isExclusive) { + this.exclusive = isExclusive; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java index 5e42276..c750fca 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java @@ -35,13 +35,15 @@ public class ExecuteActionRequest { private RequestOperationLevel operationLevel = null; private String actionName; private Map<String, String> parameters; + private boolean exclusive; public ExecuteActionRequest(String clusterName, String commandName, String actionName, List<RequestResourceFilter> resourceFilters, RequestOperationLevel operationLevel, - Map<String, String> parameters) { - this(clusterName, commandName, parameters); + Map<String, String> parameters, + boolean exclusive) { + this(clusterName, commandName, parameters, exclusive); this.actionName = actionName; if (resourceFilters != null) { this.resourceFilters.addAll(resourceFilters); @@ -53,7 +55,9 @@ public class ExecuteActionRequest { * Create an ExecuteActionRequest to execute a command. * No filters. */ - public ExecuteActionRequest(String clusterName, String commandName, Map<String, String> parameters) { + public ExecuteActionRequest(String clusterName, String commandName, + Map<String, String> parameters, + boolean exclusive) { this.clusterName = clusterName; this.commandName = commandName; this.actionName = null; @@ -62,6 +66,7 @@ public class ExecuteActionRequest { this.parameters.putAll(parameters); } this.resourceFilters = new ArrayList<RequestResourceFilter>(); + this.exclusive = exclusive; } public String getClusterName() { @@ -88,6 +93,14 @@ public class ExecuteActionRequest { return parameters; } + public boolean isExclusive() { + return exclusive; + } + + public void setExclusive(boolean isExclusive) { + this.exclusive = isExclusive; + } + public Boolean isCommand() { return actionName == null || actionName.isEmpty(); } @@ -100,6 +113,7 @@ public class ExecuteActionRequest { append(", command :" + commandName). append(", inputs :" + parameters.toString()). append(", resourceFilters: " + resourceFilters). + append(", exclusive: " + exclusive). append(", clusterName :" + clusterName).toString(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/controller/MaintenanceStateHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/MaintenanceStateHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/MaintenanceStateHelper.java index 059e2c9..2222f24 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/MaintenanceStateHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/MaintenanceStateHelper.java @@ -279,7 +279,7 @@ public class MaintenanceStateHelper { ExecuteActionRequest actionRequest = new ExecuteActionRequest( clusterName, null, NAGIOS_ACTION_NAME, Collections.singletonList(resourceFilter), - level, params); + level, params, true); // createAction() may throw an exception if Nagios is in MS or // if Nagios is absent in cluster. This exception is usually ignored at http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java index 3e36ee9..561f5d9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java @@ -70,6 +70,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider protected static final String REQUEST_CREATE_TIME_ID = "Requests/create_time"; protected static final String REQUEST_START_TIME_ID = "Requests/start_time"; protected static final String REQUEST_END_TIME_ID = "Requests/end_time"; + protected static final String REQUEST_EXCLUSIVE_ID = "Requests/exclusive"; protected static final String REQUEST_TASK_CNT_ID = "Requests/task_count"; protected static final String REQUEST_FAILED_TASK_CNT_ID = "Requests/failed_task_count"; protected static final String REQUEST_ABORTED_TASK_CNT_ID = "Requests/aborted_task_count"; @@ -83,6 +84,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider protected static final String HOSTS_ID = "hosts"; protected static final String ACTION_ID = "action"; protected static final String INPUTS_ID = "parameters"; + protected static final String EXLUSIVE_ID = "exclusive"; private static Set<String> pkPropertyIds = new HashSet<String>(Arrays.asList(new String[]{ REQUEST_ID_PROPERTY_ID})); @@ -317,13 +319,19 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider params.put(key.substring(keyPrefix.length()), requestInfoProperties.get(key)); } } + + boolean exclusive = false; + if (requestInfoProperties.containsKey(EXLUSIVE_ID)) { + exclusive = Boolean.valueOf(requestInfoProperties.get(EXLUSIVE_ID).trim()); + } + return new ExecuteActionRequest( (String) propertyMap.get(REQUEST_CLUSTER_NAME_PROPERTY_ID), commandName, actionName, resourceFilterList, operationLevel, - params); + params, exclusive); } // Get all of the request resources for the given properties @@ -422,6 +430,8 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider setResourceProperty(resource, REQUEST_CREATE_TIME_ID, request.getCreateTime(), requestedPropertyIds); setResourceProperty(resource, REQUEST_START_TIME_ID, request.getStartTime(), requestedPropertyIds); setResourceProperty(resource, REQUEST_END_TIME_ID, request.getEndTime(), requestedPropertyIds); + setResourceProperty(resource, REQUEST_EXCLUSIVE_ID, request.isExclusive(), requestedPropertyIds); + if (request.getRequestScheduleId() != null) { setResourceProperty(resource, REQUEST_SOURCE_SCHEDULE_ID, request.getRequestScheduleId(), requestedPropertyIds); } else { http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java index a35771e..b1aad00 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java @@ -84,6 +84,10 @@ public class RequestEntity { @Column(name = "end_time", nullable = false) private Long endTime = -1L; + @Basic + @Column(name = "exclusive_execution", insertable = true, updatable = true, nullable = false) + private Integer exclusive = 0; + @OneToMany(mappedBy = "request") private Collection<StageEntity> stages; @@ -145,6 +149,14 @@ public class RequestEntity { this.endTime = endTime; } + public Boolean isExclusive() { + return exclusive == 0 ? false : true; + } + + public void setExclusive(Boolean exclusive) { + this.exclusive = (exclusive == false ? 0 : 1); + } + public String getInputs() { return inputs != null ? new String(inputs) : null; } http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java index 75635cc..a08d794 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java @@ -228,6 +228,10 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog { addAlertingFrameworkDDL(); + // Exclusive requests changes + dbAccessor.addColumn("request", new DBColumnInfo( + "exclusive_execution", Integer.class, 1, 0, false)); + //service config versions changes //remove old artifacts (for versions <=1.4.1) which depend on tables changed http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql index 4a6d3ba..fee90c9 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql @@ -46,7 +46,7 @@ CREATE TABLE execution_command (task_id BIGINT NOT NULL, command LONGBLOB, PRIMA CREATE TABLE host_role_command (task_id BIGINT NOT NULL, attempt_count SMALLINT NOT NULL, event LONGTEXT NOT NULL, exitcode INTEGER NOT NULL, host_name VARCHAR(255) NOT NULL, last_attempt_time BIGINT NOT NULL, request_id BIGINT NOT NULL, role VARCHAR(255), role_command VARCHAR(255), stage_id BIGINT NOT NULL, start_time BIGINT NOT NULL, end_time BIGINT, status VARCHAR(255), std_error LONGBLOB, std_out LONGBLOB, output_log VARCHAR(255) NULL, error_log VARCHAR(255) NULL, structured_out LONGBLOB, command_detail VARCHAR(255), custom_command_name VARCHAR(255), PRIMARY KEY (task_id)); CREATE TABLE role_success_criteria (role VARCHAR(255) NOT NULL, request_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, success_factor DOUBLE NOT NULL, PRIMARY KEY (role, request_id, stage_id)); CREATE TABLE stage (stage_id BIGINT NOT NULL, request_id BIGINT NOT NULL, cluster_id BIGINT, log_info VARCHAR(255) NOT NULL, request_context VARCHAR(255), cluster_host_info LONGBLOB, command_params LONGBLOB, host_params LONGBLOB, PRIMARY KEY (stage_id, request_id)); -CREATE TABLE request (request_id BIGINT NOT NULL, cluster_id BIGINT, request_schedule_id BIGINT, command_name VARCHAR(255), create_time BIGINT NOT NULL, end_time BIGINT NOT NULL, inputs LONGBLOB, request_context VARCHAR(255), request_type VARCHAR(255), start_time BIGINT NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); +CREATE TABLE request (request_id BIGINT NOT NULL, cluster_id BIGINT, request_schedule_id BIGINT, command_name VARCHAR(255), create_time BIGINT NOT NULL, end_time BIGINT NOT NULL, exclusive_execution TINYINT(1) NOT NULL DEFAULT 0, inputs LONGBLOB, request_context VARCHAR(255), request_type VARCHAR(255), start_time BIGINT NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); CREATE TABLE requestresourcefilter (filter_id BIGINT NOT NULL, request_id BIGINT NOT NULL, service_name VARCHAR(255), component_name VARCHAR(255), hosts LONGBLOB, PRIMARY KEY (filter_id)); CREATE TABLE requestoperationlevel (operation_level_id BIGINT NOT NULL, request_id BIGINT NOT NULL, level_name VARCHAR(255), cluster_name VARCHAR(255), service_name VARCHAR(255), host_component_name VARCHAR(255), host_name VARCHAR(255), PRIMARY KEY (operation_level_id)); CREATE TABLE key_value_store (`key` VARCHAR(255), `value` LONGTEXT, PRIMARY KEY (`key`)); http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql index 2e6b5c0..f68a718 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql @@ -37,7 +37,7 @@ CREATE TABLE execution_command (task_id NUMBER(19) NOT NULL, command BLOB NULL, CREATE TABLE host_role_command (task_id NUMBER(19) NOT NULL, attempt_count NUMBER(5) NOT NULL, event CLOB NULL, exitcode NUMBER(10) NOT NULL, host_name VARCHAR2(255) NOT NULL, last_attempt_time NUMBER(19) NOT NULL, request_id NUMBER(19) NOT NULL, role VARCHAR2(255) NULL, role_command VARCHAR2(255) NULL, stage_id NUMBER(19) NOT NULL, start_time NUMBER(19) NOT NULL, end_time NUMBER(19), status VARCHAR2(255) NULL, std_error BLOB NULL, std_out BLOB NULL, output_log VARCHAR2(255) NULL, error_log VARCHAR2(255) NULL, structured_out BLOB NULL, command_detail VARCHAR2(255) NULL, custom_command_name VARCHAR2(255) NULL, PRIMARY KEY (task_id)); CREATE TABLE role_success_criteria (role VARCHAR2(255) NOT NULL, request_id NUMBER(19) NOT NULL, stage_id NUMBER(19) NOT NULL, success_factor NUMBER(19,4) NOT NULL, PRIMARY KEY (role, request_id, stage_id)); CREATE TABLE stage (stage_id NUMBER(19) NOT NULL, request_id NUMBER(19) NOT NULL, cluster_id NUMBER(19) NULL, log_info VARCHAR2(255) NULL, request_context VARCHAR2(255) NULL, cluster_host_info BLOB NOT NULL, command_params BLOB, host_params BLOB, PRIMARY KEY (stage_id, request_id)); -CREATE TABLE request (request_id NUMBER(19) NOT NULL, cluster_id NUMBER(19), request_schedule_id NUMBER(19), command_name VARCHAR(255), create_time NUMBER(19) NOT NULL, end_time NUMBER(19) NOT NULL, inputs BLOB, request_context VARCHAR(255), request_type VARCHAR(255), start_time NUMBER(19) NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); +CREATE TABLE request (request_id NUMBER(19) NOT NULL, cluster_id NUMBER(19), request_schedule_id NUMBER(19), command_name VARCHAR(255), create_time NUMBER(19) NOT NULL, end_time NUMBER(19) NOT NULL, exclusive_execution NUMBER(1) DEFAULT 0 NOT NULL, inputs BLOB, request_context VARCHAR(255), request_type VARCHAR(255), start_time NUMBER(19) NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); CREATE TABLE requestresourcefilter (filter_id NUMBER(19) NOT NULL, request_id NUMBER(19) NOT NULL, service_name VARCHAR2(255), component_name VARCHAR2(255), hosts BLOB, PRIMARY KEY (filter_id)); CREATE TABLE requestoperationlevel (operation_level_id NUMBER(19) NOT NULL, request_id NUMBER(19) NOT NULL, level_name VARCHAR2(255), cluster_name VARCHAR2(255), service_name VARCHAR2(255), host_component_name VARCHAR2(255), host_name VARCHAR2(255), PRIMARY KEY (operation_level_id)); CREATE TABLE key_value_store ("key" VARCHAR2(255) NOT NULL, "value" CLOB NULL, PRIMARY KEY ("key")); http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql index 400373e..1a2a63e 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql @@ -59,7 +59,7 @@ CREATE TABLE role_success_criteria (role VARCHAR(255) NOT NULL, request_id BIGIN CREATE TABLE stage (stage_id BIGINT NOT NULL, request_id BIGINT NOT NULL, cluster_id BIGINT NOT NULL, log_info VARCHAR(255) NOT NULL, request_context VARCHAR(255), cluster_host_info BYTEA NOT NULL, command_params BYTEA, host_params BYTEA, PRIMARY KEY (stage_id, request_id)); -CREATE TABLE request (request_id BIGINT NOT NULL, cluster_id BIGINT, command_name VARCHAR(255), create_time BIGINT NOT NULL, end_time BIGINT NOT NULL, inputs BYTEA, request_context VARCHAR(255), request_type VARCHAR(255), request_schedule_id BIGINT, start_time BIGINT NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); +CREATE TABLE request (request_id BIGINT NOT NULL, cluster_id BIGINT, command_name VARCHAR(255), create_time BIGINT NOT NULL, end_time BIGINT NOT NULL, exclusive_execution SMALLINT NOT NULL DEFAULT 0, inputs BYTEA, request_context VARCHAR(255), request_type VARCHAR(255), request_schedule_id BIGINT, start_time BIGINT NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); CREATE TABLE requestresourcefilter (filter_id BIGINT NOT NULL, request_id BIGINT NOT NULL, service_name VARCHAR(255), component_name VARCHAR(255), hosts BYTEA, PRIMARY KEY (filter_id)); http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql index 74e6d1c..f284580 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql @@ -91,7 +91,7 @@ GRANT ALL PRIVILEGES ON TABLE ambari.role_success_criteria TO :username; CREATE TABLE ambari.stage (stage_id BIGINT NOT NULL, request_id BIGINT NOT NULL, cluster_id BIGINT NOT NULL, log_info VARCHAR(255) NOT NULL, request_context VARCHAR(255), cluster_host_info BYTEA NOT NULL, command_params BYTEA, host_params BYTEA, PRIMARY KEY (stage_id, request_id)); GRANT ALL PRIVILEGES ON TABLE ambari.stage TO :username; -CREATE TABLE ambari.request (request_id BIGINT NOT NULL, cluster_id BIGINT, command_name VARCHAR(255), create_time BIGINT NOT NULL, end_time BIGINT NOT NULL, inputs BYTEA, request_context VARCHAR(255), request_type VARCHAR(255), request_schedule_id BIGINT, start_time BIGINT NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); +CREATE TABLE ambari.request (request_id BIGINT NOT NULL, cluster_id BIGINT, command_name VARCHAR(255), create_time BIGINT NOT NULL, end_time BIGINT NOT NULL, exclusive_execution SMALLINT NOT NULL DEFAULT 0, inputs BYTEA, request_context VARCHAR(255), request_type VARCHAR(255), request_schedule_id BIGINT, start_time BIGINT NOT NULL, status VARCHAR(255), PRIMARY KEY (request_id)); GRANT ALL PRIVILEGES ON TABLE ambari.request TO :username; CREATE TABLE ambari.requestresourcefilter (filter_id BIGINT NOT NULL, request_id BIGINT NOT NULL, service_name VARCHAR(255), component_name VARCHAR(255), hosts BYTEA, PRIMARY KEY (filter_id)); http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/main/resources/properties.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/properties.json b/ambari-server/src/main/resources/properties.json index 9471f3d..76797d3 100644 --- a/ambari-server/src/main/resources/properties.json +++ b/ambari-server/src/main/resources/properties.json @@ -130,6 +130,7 @@ "Requests/create_time", "Requests/start_time", "Requests/end_time", + "Requests/exclusive", "Requests/task_count", "Requests/failed_task_count", "Requests/aborted_task_count", http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java index 6d4f056..4369604 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java @@ -445,7 +445,7 @@ public class TestActionDBAccessorImpl { List<RequestResourceFilter> resourceFilters = new ArrayList<RequestResourceFilter>() {{ add(resourceFilter); }}; ExecuteActionRequest executeActionRequest = new ExecuteActionRequest - ("cluster1", null, actionName, resourceFilters, null, null); + ("cluster1", null, actionName, resourceFilters, null, null, false); Request request = new Request(stages, clusters); db.persistActions(request); } http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index c4a88cf..a20f252 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -132,6 +132,10 @@ public class TestActionScheduler { stages.add(s); when(db.getStagesInProgress()).thenReturn(stages); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, @@ -278,6 +282,11 @@ public class TestActionScheduler { stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -361,6 +370,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @@ -488,6 +501,11 @@ public class TestActionScheduler { stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -548,6 +566,11 @@ public class TestActionScheduler { stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -652,6 +675,11 @@ public class TestActionScheduler { RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4)); ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); Properties properties = new Properties(); @@ -731,6 +759,11 @@ public class TestActionScheduler { RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4)); ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); Properties properties = new Properties(); @@ -800,6 +833,11 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); Properties properties = new Properties(); @@ -859,6 +897,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -988,7 +1030,6 @@ public class TestActionScheduler { when(host.getState()).thenReturn(HostState.HEALTHY); when(host.getHostName()).thenReturn(host1); - final List<Stage> stages = new ArrayList<Stage>(); long now = System.currentTimeMillis(); @@ -1037,6 +1078,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1210,6 +1255,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1432,8 +1481,12 @@ public class TestActionScheduler { when(host.getState()).thenReturn(HostState.HEALTHY); when(host.getHostName()).thenReturn(hostname); - ActionDBAccessor db = mock(ActionDBAccessorImpl.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + Stage s1 = StageUtils.getATestStage(requestId1, stageId, hostname, CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED, @@ -1517,6 +1570,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, @@ -1595,6 +1652,11 @@ public class TestActionScheduler { stages.add(s); ActionDBAccessor db = mock(ActionDBAccessor.class); + + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); doAnswer(new Answer() { @Override @@ -1665,6 +1727,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + when(db.getStagesInProgress()).thenReturn(stages); List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>(); @@ -1747,11 +1813,6 @@ public class TestActionScheduler { scheduler.doWork(); - - - //List<CommandReport> reports = new ArrayList<CommandReport>(); - //reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1)); - //am.processTaskResponse(hostname, reports, stages.get(0).getOrderedHostRoleCommands()); String reason = "Some reason"; scheduler.scheduleCancellingRequest(requestId, reason); @@ -1766,4 +1827,193 @@ public class TestActionScheduler { Assert.assertEquals(cancelCommand.getReason(), reason); } + + @Test + public void testExclusiveRequests() throws Exception { + ActionQueue aq = new ActionQueue(); + Clusters fsm = mock(Clusters.class); + Cluster oneClusterMock = mock(Cluster.class); + Service serviceObj = mock(Service.class); + ServiceComponent scomp = mock(ServiceComponent.class); + ServiceComponentHost sch = mock(ServiceComponentHost.class); + UnitOfWork unitOfWork = mock(UnitOfWork.class); + RequestFactory requestFactory = mock(RequestFactory.class); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); + when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); + when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); + when(serviceObj.getCluster()).thenReturn(oneClusterMock); + + HashMap<String, ServiceComponentHost> hosts = + new HashMap<String, ServiceComponentHost>(); + String hostname1 = "hostname1"; + String hostname2 = "hostname2"; + String hostname3 = "hostname3"; + + hosts.put(hostname1, sch); + hosts.put(hostname2, sch); + hosts.put(hostname3, sch); + when(scomp.getServiceComponentHosts()).thenReturn(hosts); + + long requestId1 = 1; + long requestId2 = 2; + long requestId3 = 3; + + final List<Stage> stagesInProgress = new ArrayList<Stage>(); + int namenodeCmdTaskId = 1; + stagesInProgress.add( + getStageWithSingleTask( + hostname1, "cluster1", Role.NAMENODE, RoleCommand.START, + Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1)); + stagesInProgress.add( + getStageWithSingleTask( + hostname1, "cluster1", Role.DATANODE, RoleCommand.START, + Service.Type.HDFS, 2, 2, (int) requestId1)); + stagesInProgress.add( + getStageWithSingleTask( + hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive + Service.Type.HDFS, 3, 3, (int) requestId2)); + + stagesInProgress.add( + getStageWithSingleTask( + hostname3, "cluster1", Role.DATANODE, RoleCommand.START, + Service.Type.HDFS, 4, 4, (int) requestId3)); + + + Host host1 = mock(Host.class); + when(fsm.getHost(anyString())).thenReturn(host1); + when(host1.getState()).thenReturn(HostState.HEALTHY); + when(host1.getHostName()).thenReturn(hostname); + + Host host2 = mock(Host.class); + when(fsm.getHost(anyString())).thenReturn(host2); + when(host2.getState()).thenReturn(HostState.HEALTHY); + when(host2.getHostName()).thenReturn(hostname); + + Host host3 = mock(Host.class); + when(fsm.getHost(anyString())).thenReturn(host3); + when(host3.getState()).thenReturn(HostState.HEALTHY); + when(host3.getHostName()).thenReturn(hostname); + + ActionDBAccessor db = mock(ActionDBAccessor.class); + + when(db.getStagesInProgress()).thenReturn(stagesInProgress); + + List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>(); + for (Stage stage : stagesInProgress) { + requestTasks.addAll(stage.getOrderedHostRoleCommands()); + } + when(db.getRequestTasks(anyLong())).thenReturn(requestTasks); + when(db.getAllStages(anyLong())).thenReturn(stagesInProgress); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0]; + for (CommandReport report : reports) { + String actionId = report.getActionId(); + long[] requestStageIds = StageUtils.getRequestStage(actionId); + Long requestId = requestStageIds[0]; + Long stageId = requestStageIds[1]; + String role = report.getRole(); + Long id = report.getTaskId(); + for (Stage stage : stagesInProgress) { + if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { + for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) { + if (hostRoleCommand.getTaskId() == id) { + hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus())); + } + } + } + } + + } + + return null; + } + }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class)); + + when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Long taskId = (Long) invocation.getArguments()[0]; + for (Stage stage : stagesInProgress) { + for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { + if (taskId.equals(command.getTaskId())) { + return command; + } + } + } + return null; + } + }); + + final Map<Long, Boolean> startedRequests = new HashMap<Long, Boolean>(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + startedRequests.put((Long)invocation.getArguments()[0], true); + return null; + } + }).when(db).startRequest(anyLong()); + + Request request1 = mock(Request.class); + when(request1.isExclusive()).thenReturn(false); + Request request2 = mock(Request.class); + when(request2.isExclusive()).thenReturn(true); + Request request3 = mock(Request.class); + when(request3.isExclusive()).thenReturn(false); + + when(db.getRequest(requestId1)).thenReturn(request1); + when(db.getRequest(requestId2)).thenReturn(request2); + when(db.getRequest(requestId3)).thenReturn(request3); + + Properties properties = new Properties(); + Configuration conf = new Configuration(properties); + ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm); + + ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), serverActionManager, unitOfWork, conf); + + ActionManager am = new ActionManager( + 2, 2, aq, fsm, db, new HostsMap((String) null), + serverActionManager, unitOfWork, requestFactory, conf); + + // Execution of request 1 + + scheduler.doWork(); + + Assert.assertTrue(startedRequests.containsKey(requestId1)); + Assert.assertFalse(startedRequests.containsKey(requestId2)); + Assert.assertFalse(startedRequests.containsKey(requestId3)); + + stagesInProgress.remove(0); + + scheduler.doWork(); + + Assert.assertTrue(startedRequests.containsKey(requestId1)); + Assert.assertFalse(startedRequests.containsKey(requestId2)); + Assert.assertFalse(startedRequests.containsKey(requestId3)); + + // Execution of request 2 + + stagesInProgress.remove(0); + + scheduler.doWork(); + + Assert.assertTrue(startedRequests.containsKey(requestId1)); + Assert.assertTrue(startedRequests.containsKey(requestId2)); + Assert.assertFalse(startedRequests.containsKey(requestId3)); + + // Execution of request 3 + + stagesInProgress.remove(0); + + scheduler.doWork(); + + Assert.assertTrue(startedRequests.containsKey(requestId1)); + Assert.assertTrue(startedRequests.containsKey(requestId2)); + Assert.assertTrue(startedRequests.containsKey(requestId3)); + + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java index a14c86a..898efbf 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java @@ -115,7 +115,7 @@ public class AmbariCustomCommandExecutionHelperTest { { put("forceRefreshConfigTags" , "capacity-scheduler"); } - }); + }, false); actionRequest.getResourceFilters().add(new RequestResourceFilter("YARN", "RESOURCEMANAGER",Collections.singletonList("c6401"))); controller.createAction(actionRequest, requestProperties); http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java index 8a3e270..ae0becf 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java @@ -2397,7 +2397,7 @@ public class AmbariManagementControllerTest { resourceFilters.add(resourceFilter); ExecuteActionRequest request = new ExecuteActionRequest(clusterName, - "DECOMMISSION", null, resourceFilters, level, params); + "DECOMMISSION", null, resourceFilters, level, params, false); Map<String, String> requestProperties = new HashMap<String, String>(); requestProperties.put(REQUEST_CONTEXT_PROPERTY, "Called from a test"); @@ -2437,7 +2437,7 @@ public class AmbariManagementControllerTest { ArrayList<RequestResourceFilter> filters = new ArrayList<RequestResourceFilter>(); filters.add(resourceFilter); request = new ExecuteActionRequest(clusterName, "DECOMMISSION", null, - filters, level, params); + filters, level, params, false); response = controller.createAction(request, requestProperties); @@ -2461,7 +2461,7 @@ public class AmbariManagementControllerTest { put("included_hosts", "h2"); }}; request = new ExecuteActionRequest(clusterName, "DECOMMISSION", null, - resourceFilters, level, params); + resourceFilters, level, params, false); response = controller.createAction(request, requestProperties); @@ -3852,7 +3852,7 @@ public class AmbariManagementControllerTest { List<RequestResourceFilter> resourceFilters = new ArrayList<RequestResourceFilter>(); resourceFilters.add(resourceFilter); - ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); RequestStatusResponse response = controller.createAction(actionRequest, requestProperties); assertEquals(1, response.getTasks().size()); ShortTaskStatus taskStatus = response.getTasks().get(0); @@ -3880,7 +3880,7 @@ public class AmbariManagementControllerTest { resourceFilters.clear(); resourceFilter = new RequestResourceFilter("", "", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params, false); response = controller.createAction(actionRequest, requestProperties); assertEquals(2, response.getTasks().size()); @@ -3910,7 +3910,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("", "", hosts); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); response = controller.createAction(actionRequest, requestProperties); assertEquals(1, response.getTasks().size()); taskStatus = response.getTasks().get(0); @@ -3984,7 +3984,7 @@ public class AmbariManagementControllerTest { "HDFS_CLIENT", new ArrayList<String>() {{ add("h1"); }}); ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", - "RESTART", params); + "RESTART", params, false); actionRequest.getResourceFilters().add(resourceFilter); Map<String, String> requestProperties = new HashMap<String, String>(); @@ -4081,7 +4081,7 @@ public class AmbariManagementControllerTest { "NAGIOS_SERVER", new ArrayList<String>() {{ add("h1"); }}); ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", - "RESTART", params); + "RESTART", params, false); actionRequest.getResourceFilters().add(resourceFilter); Map<String, String> requestProperties = new HashMap<String, String>(); @@ -4174,7 +4174,7 @@ public class AmbariManagementControllerTest { RequestResourceFilter resourceFilter = new RequestResourceFilter("HDFS", null, null); - ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", "NON_EXISTENT_CHECK", params); + ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", "NON_EXISTENT_CHECK", params, false); actionRequest.getResourceFilters().add(resourceFilter); Map<String, String> requestProperties = new HashMap<String, String>(); @@ -4185,7 +4185,7 @@ public class AmbariManagementControllerTest { //actionRequest = new ExecuteActionRequest("c1", "NON_EXISTENT_SERVICE_CHECK", "HDFS", params); //expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Unsupported action"); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", params); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", params, false); actionRequest.getResourceFilters().add(resourceFilter); expectActionCreationErrorWithMessage(actionRequest, requestProperties, @@ -4198,7 +4198,7 @@ public class AmbariManagementControllerTest { List<RequestResourceFilter> resourceFilters = new ArrayList<RequestResourceFilter>(); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Unsupported action DECOMMISSION for Service: HDFS and Component: HDFS_CLIENT"); @@ -4206,7 +4206,7 @@ public class AmbariManagementControllerTest { resourceFilters.clear(); resourceFilter = new RequestResourceFilter("HDFS", null, null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "DECOMMISSION_DATANODE", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "DECOMMISSION_DATANODE", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action DECOMMISSION_DATANODE does not exist"); @@ -4214,7 +4214,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("YARN", "RESOURCEMANAGER", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Service not found, clusterName=c1, serviceName=YARN"); @@ -4227,7 +4227,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS", "NAMENODE", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Same host cannot be specified for inclusion as well as exclusion. Hosts: [h1]"); @@ -4240,7 +4240,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS", "NAMENODE", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Component HDFS_CLIENT is not supported for decommissioning."); @@ -4250,7 +4250,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS", "NAMENODE", hosts); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Decommission command cannot be issued with target host(s) specified."); @@ -4262,7 +4262,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS", "NAMENODE", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Component DATANODE on host h1 cannot be decommissioned as its not in STARTED state"); @@ -4270,7 +4270,7 @@ public class AmbariManagementControllerTest { put("excluded_hosts", "h1 "); put("mark_draining_only", "true"); }}; - actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2); + actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION", null, resourceFilters, null, params2, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "mark_draining_only is not a valid parameter for NAMENODE"); @@ -4290,16 +4290,16 @@ public class AmbariManagementControllerTest { "a4", ActionType.SYSTEM, "", "HIVE", "", "Does file exist", TargetHostType.ANY, Short.valueOf("100"))); - actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null); + actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a1 requires input 'test' that is not provided"); - actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a1 requires input 'dirName' that is not provided"); params.put("dirName", "dirName"); - actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a1 requires explicit target host(s)"); @@ -4307,7 +4307,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HIVE", null, null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a2 targets service HIVE that does not match with expected HDFS"); @@ -4315,7 +4315,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS", "HDFS_CLIENT", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a2 targets component HDFS_CLIENT that does not match with expected DATANODE"); @@ -4323,7 +4323,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS2", "HDFS_CLIENT", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a1 targets service HDFS2 that does not exist"); @@ -4331,7 +4331,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HDFS", "HDFS_CLIENT2", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a1 targets component HDFS_CLIENT2 that does not exist"); @@ -4339,7 +4339,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("", "HDFS_CLIENT2", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a1 targets component HDFS_CLIENT2 without specifying the target service"); @@ -4348,7 +4348,7 @@ public class AmbariManagementControllerTest { resourceFilters.add(resourceFilter); // targets a service that is not a member of the stack (e.g. MR not in HDP-2) - actionRequest = new ExecuteActionRequest("c1", null, "a3", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a3", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Action a3 targets service MAPREDUCE that does not exist"); @@ -4358,7 +4358,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("", "", hosts); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a2", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Request specifies host h6 but its not a valid host based on the target service=HDFS and component=DATANODE"); @@ -4366,7 +4366,7 @@ public class AmbariManagementControllerTest { resourceFilter = new RequestResourceFilter("HIVE", "", null); resourceFilters.add(resourceFilter); - actionRequest = new ExecuteActionRequest("c1", null, "a4", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a4", resourceFilters, null, params, false); expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Suitable hosts not found, component=, service=HIVE, cluster=c1, actionName=a4"); @@ -4428,7 +4428,7 @@ public class AmbariManagementControllerTest { Map<String, String> params = new HashMap<String, String>() {{ put("test", "test"); }}; - ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", Role.HDFS_SERVICE_CHECK.name(), params); + ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", Role.HDFS_SERVICE_CHECK.name(), params, false); RequestResourceFilter resourceFilter = new RequestResourceFilter("HDFS", null, null); actionRequest.getResourceFilters().add(resourceFilter); @@ -4470,7 +4470,7 @@ public class AmbariManagementControllerTest { assertEquals(2, hostRoleCommand.getExecutionCommandWrapper().getExecutionCommand().getConfigurations().size()); assertEquals(requestProperties.get(REQUEST_CONTEXT_PROPERTY), stage.getRequestContext()); - actionRequest = new ExecuteActionRequest("c1", Role.MAPREDUCE_SERVICE_CHECK.name(), null); + actionRequest = new ExecuteActionRequest("c1", Role.MAPREDUCE_SERVICE_CHECK.name(), null, false); resourceFilter = new RequestResourceFilter("MAPREDUCE", null, null); actionRequest.getResourceFilters().add(resourceFilter); @@ -5700,7 +5700,7 @@ public class AmbariManagementControllerTest { Assert.assertEquals("Expect only one service check.", 1, commandCount); RequestResourceFilter resourceFilter = new RequestResourceFilter("HDFS", null, null); - ExecuteActionRequest actionRequest = new ExecuteActionRequest("foo1", Role.HDFS_SERVICE_CHECK.name(), null); + ExecuteActionRequest actionRequest = new ExecuteActionRequest("foo1", Role.HDFS_SERVICE_CHECK.name(), null, false); actionRequest.getResourceFilters().add(resourceFilter); Map<String, String> requestProperties = new HashMap<String, String>(); @@ -6056,7 +6056,7 @@ public class AmbariManagementControllerTest { ArrayList<RequestResourceFilter> filters = new ArrayList<RequestResourceFilter>(); filters.add(resourceFilter); ExecuteActionRequest request = new ExecuteActionRequest(clusterName, - "DECOMMISSION", null, filters, level, params); + "DECOMMISSION", null, filters, level, params, false); Map<String, String> requestProperties = new HashMap<String, String>(); requestProperties.put(REQUEST_CONTEXT_PROPERTY, "Called from a test"); @@ -6088,7 +6088,7 @@ public class AmbariManagementControllerTest { filters.add(resourceFilter); request = new ExecuteActionRequest(clusterName, "DECOMMISSION", - null, filters, level, params); + null, filters, level, params, false); response = controller.createAction(request, requestProperties); @@ -6130,7 +6130,7 @@ public class AmbariManagementControllerTest { filters = new ArrayList<RequestResourceFilter>(); filters.add(resourceFilter); request = new ExecuteActionRequest(clusterName, "DECOMMISSION", null, - filters, level, params); + filters, level, params, false); response = controller.createAction(request, requestProperties); @@ -6230,7 +6230,7 @@ public class AmbariManagementControllerTest { resourceFilters.add(resourceFilter1); resourceFilters.add(resourceFilter2); - ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); RequestStatusResponse response = null; try { response = controller.createAction(actionRequest, requestProperties); @@ -6240,7 +6240,7 @@ public class AmbariManagementControllerTest { "allows one resource filter to be specified")); } resourceFilters.remove(resourceFilter1); - actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params); + actionRequest = new ExecuteActionRequest("c1", null, "a1", resourceFilters, null, params, false); response = controller.createAction(actionRequest, requestProperties); assertEquals(1, response.getTasks().size()); @@ -6328,7 +6328,7 @@ public class AmbariManagementControllerTest { resourceFilters.add(resourceFilter); ExecuteActionRequest request = new ExecuteActionRequest("c1", - "RESTART", null, resourceFilters, null, params); + "RESTART", null, resourceFilters, null, params, false); RequestStatusResponse response = controller.createAction(request, requestProperties); Assert.assertEquals(3, response.getTasks().size()); @@ -6359,7 +6359,7 @@ public class AmbariManagementControllerTest { new ArrayList<String>() {{ add("h1"); }}); resourceFilters.add(resourceFilter); request = new ExecuteActionRequest("c1", Role.HDFS_SERVICE_CHECK.name(), - null, resourceFilters, null, null); + null, resourceFilters, null, null, false); response = controller.createAction(request, requestProperties); Assert.assertEquals(1, response.getTasks().size()); @@ -6837,7 +6837,7 @@ public class AmbariManagementControllerTest { put("excluded_hosts", " h1 "); }}; RequestResourceFilter resourceFilter = new RequestResourceFilter("HDFS", "NAMENODE", null); - ExecuteActionRequest request = new ExecuteActionRequest(clusterName, "DECOMMISSION", params); + ExecuteActionRequest request = new ExecuteActionRequest(clusterName, "DECOMMISSION", params, false); request.getResourceFilters().add(resourceFilter); Map<String, String> requestProperties = new HashMap<String, String>(); @@ -8798,7 +8798,7 @@ public class AmbariManagementControllerTest { amc.createHostComponents(componentHostRequests); RequestResourceFilter resourceFilter = new RequestResourceFilter("HDFS", null, null); - ExecuteActionRequest ar = new ExecuteActionRequest(CLUSTER_NAME, Role.HDFS_SERVICE_CHECK.name(), null); + ExecuteActionRequest ar = new ExecuteActionRequest(CLUSTER_NAME, Role.HDFS_SERVICE_CHECK.name(), null, false); ar.getResourceFilters().add(resourceFilter); amc.createAction(ar, null); @@ -10080,7 +10080,7 @@ public class AmbariManagementControllerTest { resourceFilters.add(resourceFilter); ExecuteActionRequest actionRequest = new ExecuteActionRequest(null, null, - "a1", resourceFilters, null, requestParams); + "a1", resourceFilters, null, requestParams, false); RequestStatusResponse response = controller.createAction(actionRequest, requestProperties); assertEquals(1, response.getTasks().size()); ShortTaskStatus taskStatus = response.getTasks().get(0); @@ -10112,7 +10112,7 @@ public class AmbariManagementControllerTest { resourceFilters.add(resourceFilter); actionRequest = new ExecuteActionRequest(null, null, - "a1", resourceFilters, null, requestParams); + "a1", resourceFilters, null, requestParams, false); response = controller.createAction(actionRequest, requestProperties); assertEquals(2, response.getTasks().size()); boolean host1Found = false; http://git-wip-us.apache.org/repos/asf/ambari/blob/64577264/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java index 76b9fbc..2b00f40 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java @@ -118,7 +118,7 @@ public class BackgroundCustomCommandExecutionTest { }; ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", - "REBALANCEHDFS", new HashMap<String, String>()); + "REBALANCEHDFS", new HashMap<String, String>(), false); actionRequest.getResourceFilters().add(new RequestResourceFilter("HDFS", "NAMENODE",Collections.singletonList("c6401"))); controller.createAction(actionRequest, requestProperties);
