This is an automated email from the ASF dual-hosted git repository.
benyoka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 799e487 AMBARI-24964 stack advisor layout recommendation for add
service request (benyoka) (#2662)
799e487 is described below
commit 799e487c960a567bd541322f5d544e3548eab0c9
Author: benyoka <[email protected]>
AuthorDate: Thu Nov 29 13:18:41 2018 +0100
AMBARI-24964 stack advisor layout recommendation for add service request
(benyoka) (#2662)
* AMBARI-24964 stack advisor layout recommendation for add service request
(benyoka)
* AMBARI-24964 fix review comments (benyoka)
* AMBARI-24964 fix checkstyle (benyoka)
---
.../services/stackadvisor/StackAdvisorRequest.java | 5 +
.../recommendations/RecommendationResponse.java | 61 +++++
.../validations/ValidationResponse.java | 15 ++
.../ambari/server/configuration/Configuration.java | 19 ++
.../server/controller/AddServiceRequest.java | 8 +
.../ambari/server/controller/internal/Stack.java | 7 +-
.../server/topology/addservice/AddServiceInfo.java | 4 +
.../addservice/AddServiceOrchestrator.java | 35 ++-
.../topology/addservice/AutoHostgroupStrategy.java | 49 ++++
.../addservice/GroupByComponentsStrategy.java | 50 ++++
.../addservice/HostGroupForEachHostStrategy.java | 34 +++
.../topology/addservice/HostGroupStrategy.java | 29 +++
.../topology/addservice/StackAdvisorAdapter.java | 213 ++++++++++++++++
.../RecommendationResponseTest.java | 103 ++++++++
.../BlueprintConfigurationProcessorTest.java | 4 +-
.../topology/addservice/HostGroupStrategyTest.java | 79 ++++++
.../addservice/StackAdvisorAdapterTest.java | 275 +++++++++++++++++++++
17 files changed, 974 insertions(+), 16 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java
index 98c75f4..10baa33 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/StackAdvisorRequest.java
@@ -29,6 +29,7 @@ import java.util.Set;
import
org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse;
import org.apache.ambari.server.state.ChangedConfigInfo;
+import org.apache.ambari.server.state.StackId;
import org.apache.commons.lang.StringUtils;
import com.google.common.base.Preconditions;
@@ -157,6 +158,10 @@ public class StackAdvisorRequest {
this.instance = new StackAdvisorRequest(stackName, stackVersion);
}
+ public static StackAdvisorRequestBuilder forStack(StackId stackId) {
+ return forStack(stackId.getStackName(), stackId.getStackVersion());
+ }
+
public static StackAdvisorRequestBuilder forStack(String stackName, String
stackVersion) {
return new StackAdvisorRequestBuilder(stackName, stackVersion);
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java
index d0d8601..99e9ab2 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponse.java
@@ -18,6 +18,11 @@
package org.apache.ambari.server.api.services.stackadvisor.recommendations;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -26,9 +31,13 @@ import java.util.Set;
import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorResponse;
import org.apache.ambari.server.state.ValueAttributesInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.annotate.JsonSerialize;
+import com.google.common.collect.ImmutableMap;
+
/**
* Recommendation response POJO.
*/
@@ -125,6 +134,12 @@ public class RecommendationResponse extends
StackAdvisorResponse {
public void setHostGroups(Set<HostGroup> hostGroups) {
this.hostGroups = hostGroups;
}
+
+ public Map<String, Set<String>> getHostgroupComponentMap() {
+ return hostGroups.stream()
+ .flatMap(hg -> hg.getComponentNames().stream().map(comp ->
Pair.of(hg.getName(), comp)))
+ .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toSet())));
+ }
}
public static class BlueprintConfigurations {
@@ -202,6 +217,25 @@ public class RecommendationResponse extends
StackAdvisorResponse {
public void setComponents(Set<Map<String, String>> components) {
this.components = components;
}
+
+ @JsonIgnore
+ public Set<String> getComponentNames() {
+ return components.stream().map(comp ->
comp.get("name")).collect(toSet());
+ }
+
+ public static Set<HostGroup> fromHostGroupComponents(Map<String,
Set<String>> hostGroupComponents) {
+ return hostGroupComponents.entrySet().stream()
+ .map(entry -> create(entry.getKey(), entry.getValue()))
+ .collect(toSet());
+ }
+
+ public static HostGroup create(String name, Set<String> componentNames) {
+ HostGroup group = new HostGroup();
+ group.setName(name);
+ Set<Map<String, String>> components = componentNames.stream().map(comp
-> ImmutableMap.of("name", comp)).collect(toSet());
+ group.setComponents(components);
+ return group;
+ }
}
public static class BlueprintClusterBinding {
@@ -215,6 +249,20 @@ public class RecommendationResponse extends
StackAdvisorResponse {
public void setHostGroups(Set<BindingHostGroup> hostGroups) {
this.hostGroups = hostGroups;
}
+
+ @JsonIgnore
+ public Map<String, Set<String>> getHostgroupHostMap() {
+ return hostGroups.stream().collect(toMap(BindingHostGroup::getName,
BindingHostGroup::getHostNames));
+ }
+
+ public static BlueprintClusterBinding fromHostGroupHostMap(Map<String,
Set<String>> hostGroupHosts) {
+ Set<BindingHostGroup> hostGroups = hostGroupHosts.entrySet().stream()
+ .map(entry -> BindingHostGroup.create(entry.getKey(),
entry.getValue()))
+ .collect(toSet());
+ BlueprintClusterBinding binding = new BlueprintClusterBinding();
+ binding.setHostGroups(hostGroups);
+ return binding;
+ }
}
public static class BindingHostGroup {
@@ -239,6 +287,19 @@ public class RecommendationResponse extends
StackAdvisorResponse {
public void setHosts(Set<Map<String, String>> hosts) {
this.hosts = hosts;
}
+
+ @JsonIgnore
+ public Set<String> getHostNames() {
+ return hosts.stream().map(host -> host.get("fqdn")).collect(toSet());
+ }
+
+ public static BindingHostGroup create(String name, Set<String> hostNames) {
+ BindingHostGroup hostGroup = new BindingHostGroup();
+ hostGroup.setName(name);
+ Set<Map<String, String>> hosts = hostNames.stream().map(hostName ->
ImmutableMap.of("fqdn", hostName)).collect(toSet());
+ hostGroup.setHosts(hosts);
+ return hostGroup;
+ }
}
public static class ConfigGroup {
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java
index f51428a..faf7ced 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/stackadvisor/validations/ValidationResponse.java
@@ -23,6 +23,8 @@ import java.util.Set;
import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorResponse;
import org.codehaus.jackson.annotate.JsonProperty;
+import com.google.common.base.MoreObjects;
+
/**
* Validation response POJO.
*/
@@ -116,6 +118,19 @@ public class ValidationResponse extends
StackAdvisorResponse {
public void setConfigName(String configName) {
this.configName = configName;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("level", level)
+ .add("message", message)
+ .add("componentName", componentName)
+ .add("host", host)
+ .add("configType", configType)
+ .add("configName", configName)
+ .toString();
+ }
}
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 18ba439..a4af46c 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -68,6 +68,8 @@ import
org.apache.ambari.server.security.encryption.CredentialProvider;
import org.apache.ambari.server.state.services.MetricsRetrievalService;
import org.apache.ambari.server.state.services.RetryUpgradeActionService;
import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.topology.addservice.GroupByComponentsStrategy;
+import org.apache.ambari.server.topology.addservice.HostGroupStrategy;
import org.apache.ambari.server.upgrade.AbstractUpgradeCatalog;
import org.apache.ambari.server.utils.AmbariPath;
import org.apache.ambari.server.utils.DateUtils;
@@ -2603,6 +2605,14 @@ public class Configuration {
public static final ConfigurationProperty<Integer>
DEFAULT_MAX_DEGREE_OF_PARALLELISM_FOR_UPGRADES = new ConfigurationProperty<>(
"stack.upgrade.default.parallelism", 100);
+ /**
+ * Fully qualified class name of the strategy used to form host groups for
add service request layout recommendation.
+ */
+ @Markdown(description = "Fully qualified class name of the strategy used to
form host groups for add service request layout recommendation.")
+ public static final ConfigurationProperty<String>
ADD_SERVICE_HOST_GROUP_STRATEGY = new ConfigurationProperty<>(
+ "addservice.hostgroup.strategy",
GroupByComponentsStrategy.class.getName());
+
+
private static final Logger LOG = LoggerFactory.getLogger(
Configuration.class);
@@ -5537,6 +5547,15 @@ public class Configuration {
}
/**
+ * @return The class of the host group strategy for add service requests.
+ * @throws ClassNotFoundException if the specified class is not found
+ * @throws ClassCastException if the specified class is not a subclass of
{@link HostGroupStrategy}
+ */
+ public Class<? extends HostGroupStrategy>
getAddServiceHostGroupStrategyClass() throws ClassNotFoundException {
+ return
Class.forName(getProperty(ADD_SERVICE_HOST_GROUP_STRATEGY)).asSubclass(HostGroupStrategy.class);
+ }
+
+ /**
* Generates a markdown table which includes:
* <ul>
* <li>Property key name</li>
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java
index f3217d1..9c0ccb5 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java
@@ -40,6 +40,7 @@ import java.util.function.Function;
import org.apache.ambari.annotations.ApiIgnore;
import org.apache.ambari.server.controller.internal.ProvisionAction;
+import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.topology.ConfigRecommendationStrategy;
import org.apache.ambari.server.topology.ConfigurableHelper;
import org.apache.ambari.server.topology.Configuration;
@@ -172,6 +173,13 @@ public final class AddServiceRequest {
return stackVersion;
}
+ @JsonIgnore
+ @ApiIgnore
+ public Optional<StackId> getStackId() {
+ return null != stackName && null != stackVersion
+ ? Optional.of(new StackId(stackName, stackVersion)) : Optional.empty();
+ }
+
@JsonProperty(SERVICES)
@ApiModelProperty(name = SERVICES)
public Set<Service> getServices() {
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
index 02e8b4e..b4fec0c 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
@@ -206,6 +206,9 @@ public class Stack {
return version;
}
+ public StackId getStackId() {
+ return new StackId(name, version);
+ }
Map<DependencyInfo, String> getDependencyConditionalServiceMap() {
return dependencyConditionalServiceMap;
@@ -214,9 +217,9 @@ public class Stack {
/**
* Get services contained in the stack.
*
- * @return collection of all services for the stack
+ * @return set of all services for the stack
*/
- public Collection<String> getServices() {
+ public Set<String> getServices() {
return serviceComponents.keySet();
}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
index 15c3b1f..f08ad65 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
@@ -48,6 +48,10 @@ public final class AddServiceInfo {
this.config = config;
}
+ public AddServiceInfo withNewServices(Map<String, Map<String, Set<String>>>
services) {
+ return new AddServiceInfo(request, clusterName, stack, config, stages,
services);
+ }
+
@Override
public String toString() {
return "AddServiceRequest(" + stages.getId() + ")";
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
index a669b94..f4bd08a 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
@@ -72,6 +72,9 @@ public class AddServiceOrchestrator {
@Inject
private ConfigHelper configHelper;
+ @Inject
+ private StackAdvisorAdapter stackAdvisorAdapter;
+
public RequestStatusResponse processAddServiceRequest(Cluster cluster,
AddServiceRequest request) {
LOG.info("Received {} request for {}: {}", request.getOperationType(),
cluster.getClusterName(), request);
@@ -108,18 +111,19 @@ public class AddServiceOrchestrator {
try {
stack = new Stack(stackId, controller);
Set<String> existingServices = cluster.getServices().keySet();
+ // process service declarations
+ for (AddServiceRequest.Service service : request.getServices()) {
+ checkAndLog(!stack.getServices().contains(service.getName()),
+ "Unknown service %s in stack %s", service, stack.getStackId());
+ newServices.computeIfAbsent(service.getName(), __ -> new HashMap<>());
+ }
+ // process component declarations
for (AddServiceRequest.Component requestedComponent :
request.getComponents()) {
String serviceName =
stack.getServiceForComponent(requestedComponent.getName());
- if (serviceName == null) {
- String msg = String.format("No service found for component %s in
stack %s", requestedComponent.getName(), stackId);
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- if (existingServices.contains(serviceName)) {
- String msg = String.format("Service %s already exists in cluster
%s", serviceName, cluster.getClusterName());
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
+ checkAndLog( serviceName == null,
+ "No service found for component %s in stack %s",
requestedComponent.getName(), stackId);
+ checkAndLog( existingServices.contains(serviceName),
+ "Service %s already exists in cluster %s", serviceName,
cluster.getClusterName());
newServices.computeIfAbsent(serviceName, __ -> new HashMap<>())
.computeIfAbsent(requestedComponent.getName(), __ -> new HashSet<>())
@@ -145,6 +149,14 @@ public class AddServiceOrchestrator {
return validatedRequest;
}
+ private static void checkAndLog(boolean errorCondition, String errorMessage,
Object... messageParams) {
+ if (errorCondition) {
+ String msg = String.format(errorMessage, messageParams);
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
/**
* Requests layout recommendation from the stack advisor.
* @return new request, updated based on the recommended layout
@@ -152,8 +164,7 @@ public class AddServiceOrchestrator {
*/
private AddServiceInfo recommendLayout(AddServiceInfo request) {
LOG.info("Recommending layout for {}", request);
- // TODO implement
- return request;
+ return stackAdvisorAdapter.recommendLayout(request);
}
/**
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AutoHostgroupStrategy.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AutoHostgroupStrategy.java
new file mode 100644
index 0000000..4eaaa90
--- /dev/null
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AutoHostgroupStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A strategy that chooses another strategy based on cluster size.
+ */
+public class AutoHostgroupStrategy implements HostGroupStrategy {
+
+ private final int largeClusterThreshold;
+
+ public AutoHostgroupStrategy() {
+ this(10);
+ }
+
+ public AutoHostgroupStrategy(int largeClusterThreshold) {
+ this.largeClusterThreshold = largeClusterThreshold;
+ }
+
+ @Override
+ public Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>>
hostComponentMap) {
+ HostGroupStrategy strategy =
+ hostComponentMap.size() <= largeClusterThreshold ? new
HostGroupForEachHostStrategy() : new GroupByComponentsStrategy();
+ return strategy.calculateHostGroups(hostComponentMap);
+ }
+
+ public int getLargeClusterThreshold() {
+ return largeClusterThreshold;
+ }
+}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/GroupByComponentsStrategy.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/GroupByComponentsStrategy.java
new file mode 100644
index 0000000..1f5d6bc
--- /dev/null
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/GroupByComponentsStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+
+public class GroupByComponentsStrategy implements HostGroupStrategy {
+ @Override
+ public Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>>
hostComponentMap) {
+ // create components -> hosts map, the values will be the host groups
+ List<Set<String>> hostGroups = newArrayList(
+ hostComponentMap.entrySet().stream().collect(
+ groupingBy(Map.Entry::getValue, mapping(Map.Entry::getKey,
toCollection(TreeSet::new))) ).values()); // hosts names are sorted in the host
group
+ hostGroups.sort(comparing(hosts -> hosts.iterator().next())); //
alphabetical order by the first hostname in the group to have consistent outcome
+
+ // give a name to each host group and add to a map
+ Map<String, Set<String>> hostgroupMap = IntStream.range(0,
hostGroups.size()).boxed()
+ .collect(toMap(i -> "host_group_" + (i + 1), i -> hostGroups.get(i)));
+
+ return hostgroupMap;
+ }
+}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupForEachHostStrategy.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupForEachHostStrategy.java
new file mode 100644
index 0000000..1cba73b
--- /dev/null
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupForEachHostStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+public class HostGroupForEachHostStrategy implements HostGroupStrategy {
+ @Override
+ public Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>>
hostComponentMap) {
+ return hostComponentMap.keySet().stream()
+ .collect(toMap(host -> "host_group_" + host, host ->
ImmutableSet.of(host)));
+ }
+}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupStrategy.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupStrategy.java
new file mode 100644
index 0000000..b541f88
--- /dev/null
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/HostGroupStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base interface for host group creation strategies
+ */
+public interface HostGroupStrategy {
+ Map<String, Set<String>> calculateHostGroups(Map<String, Set<String>>
hostComponentMap);
+}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapter.java
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapter.java
new file mode 100644
index 0000000..45f0e59
--- /dev/null
+++
b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Maps.transformValues;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+import javax.inject.Inject;
+
+import org.apache.ambari.server.AmbariException;
+import
org.apache.ambari.server.api.services.stackadvisor.StackAdvisorException;
+import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorHelper;
+import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorRequest;
+import
org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse;
+import
org.apache.ambari.server.api.services.stackadvisor.validations.ValidationResponse;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.StackId;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.inject.ConfigurationException;
+import com.google.inject.Injector;
+import com.google.inject.ProvisionException;
+
+public class StackAdvisorAdapter {
+
+ @Inject
+ private AmbariManagementController managementController;
+
+ @Inject
+ private StackAdvisorHelper stackAdvisorHelper;
+
+ @Inject
+ private Configuration serverConfig;
+
+ @Inject
+ private Injector injector;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StackAdvisorHelper.class);
+
+ /**
+ * Recommends component layout for the new services to add. If the request
contains explicit layout for some components
+ * this will be added to the stack advisor input.
+ * @param info
+ * @return
+ */
+ AddServiceInfo recommendLayout(AddServiceInfo info) {
+ try {
+ Cluster cluster =
managementController.getClusters().getCluster(info.clusterName());
+ Map<String, Map<String, Set<String>>> clusterServices = transformValues(
+ cluster.getServices(),
+ service -> transformValues(service.getServiceComponents(), component
-> component.getServiceComponentsHosts()));
+
+ // Requested component layout will be added to the StackAdvisor input in
addition to existing
+ // component layout.
+ Map<String, Map<String, Set<String>>> allServices =
mergeDisjunctMaps(clusterServices, info.newServices());
+
+ Map<String, Set<String>> componentsToHosts =
getComponentHostMap(allServices);
+
+ Map<String, Set<String>> hostsToComponents =
getHostComponentMap(componentsToHosts);
+ List<String> hosts = ImmutableList.copyOf(cluster.getHostNames());
+ hosts.forEach( host -> hostsToComponents.putIfAbsent(host, new
HashSet<>())); // just in case there are hosts that have no components
+
+ Map<String, Set<String>> hostGroups =
getHostGroupStrategy().calculateHostGroups(hostsToComponents);
+
+ StackAdvisorRequest request =
StackAdvisorRequest.StackAdvisorRequestBuilder
+ .forStack(info.getStack().getStackId())
+ .ofType(StackAdvisorRequest.StackAdvisorRequestType.HOST_GROUPS)
+ .forHosts(hosts)
+ .forServices(allServices.keySet())
+ .forHostComponents(hostsToComponents)
+ .forHostsGroupBindings(hostGroups)
+ .withComponentHostsMap(componentsToHosts)
+ .withGPLLicenseAccepted(serverConfig.getGplLicenseAccepted())
+ .build();
+ RecommendationResponse response = stackAdvisorHelper.recommend(request);
+
+ Map<String, Map<String, Set<String>>> recommendedLayout =
getRecommendedLayout(
+
response.getRecommendations().getBlueprintClusterBinding().getHostgroupHostMap(),
+
response.getRecommendations().getBlueprint().getHostgroupComponentMap(),
+ info.getStack()::getServiceForComponent);
+
+ Set<ValidationResponse.ValidationItem> validationItems =
validateRecommendedLayout(info.getStack().getStackId(),
+ recommendedLayout,
+
response.getRecommendations().getBlueprintClusterBinding().getHostgroupHostMap());
+ if (!validationItems.isEmpty()) {
+ LOG.warn("Issues found during recommended topology validation:\n{}",
Joiner.on('\n').join(validationItems));
+ }
+
+ // Keep the recommendations for new services only
+ keepNewServicesOnly(recommendedLayout, info.newServices());
+
+ return info.withNewServices(recommendedLayout);
+ }
+ catch (AmbariException|StackAdvisorException ex) {
+ throw new IllegalArgumentException("Layout recommendation failed.", ex);
+ }
+ }
+
+ Set<ValidationResponse.ValidationItem> validateRecommendedLayout(StackId
stackId,
+
Map<String,Map<String,Set<String>>> recommendedLayout,
+ Map<String,
Set<String>> recommendedHostgroups) throws StackAdvisorException {
+ Map<String, Set<String>> componentsToHosts =
getComponentHostMap(recommendedLayout);
+ Map<String, Set<String>> hostsToComponents =
getHostComponentMap(componentsToHosts);
+ List<String> hosts = ImmutableList.copyOf(hostsToComponents.keySet());
+
+ StackAdvisorRequest request =
StackAdvisorRequest.StackAdvisorRequestBuilder
+ .forStack(stackId)
+ .ofType(StackAdvisorRequest.StackAdvisorRequestType.HOST_GROUPS)
+ .forHosts(hosts)
+ .forServices(recommendedLayout.keySet())
+ .forHostComponents(hostsToComponents)
+ .forHostsGroupBindings(recommendedHostgroups)
+ .withComponentHostsMap(componentsToHosts)
+ .withGPLLicenseAccepted(serverConfig.getGplLicenseAccepted())
+ .build();
+ ValidationResponse response = stackAdvisorHelper.validate(request);
+
+ return response.getItems();
+ }
+
+ static void keepNewServicesOnly(Map<String,Map<String,Set<String>>>
recommendedLayout, Map<String,Map<String,Set<String>>> newServices) {
+ recommendedLayout.keySet().retainAll(newServices.keySet());
+ }
+
+ static Map<String, Map<String, Set<String>>>
getRecommendedLayout(Map<String, Set<String>> hostGroupHosts,
+
Map<String, Set<String>> hostGroupComponents,
+
Function<String, String> componentToService) {
+ Map<String, Set<String>> componentHostMap =
hostGroupComponents.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(comp -> Pair.of(comp,
entry.getKey()))) // component -> hostgroup
+ .flatMap(cmpHg -> hostGroupHosts.get(cmpHg.getValue()).stream().map(host
-> Pair.of(cmpHg.getKey(), host))) // component -> host
+ .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toSet())));//
group by component
+
+ return componentHostMap.entrySet().stream().collect(
+ groupingBy(
+ cmpHost -> componentToService.apply(cmpHost.getKey()),
+ toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ }
+
+ /**
+ * Transform a map of component -> hosts to a map of hosts -> components
+ * @param componentHostMap the map to transform
+ * @return the transformed map
+ */
+ static Map<String, Set<String>> getHostComponentMap(Map<String, Set<String>>
componentHostMap) {
+ return componentHostMap.entrySet().stream()
+ .flatMap(compHosts -> compHosts.getValue().stream().map(host ->
Pair.of(host, compHosts.getKey())))
+ .collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toSet())));
+ }
+
+ /**
+ * Extracts a [component -> hosts] map from the [service -> component ->
hosts] map. Services
+ * with empty component map will be ignored
+ * @param serviceComponentHostMap the input map
+ * @return the extracted map
+ */
+ static Map<String, Set<String>> getComponentHostMap(Map<String, Map<String,
Set<String>>> serviceComponentHostMap) {
+ return serviceComponentHostMap.values().stream()
+ .reduce(StackAdvisorAdapter::mergeDisjunctMaps)
+ .orElse(new HashMap<>());
+ }
+
+ static <S, T> Map<S, T> mergeDisjunctMaps(Map<? extends S, ? extends T>
map1, Map<? extends S, ? extends T> map2) {
+ Sets.SetView<? extends S> commonKeys = Sets.intersection(map1.keySet(),
map2.keySet());
+ checkArgument(commonKeys.isEmpty(), "Maps must be disjunct. Common keys:
%s", commonKeys);
+ Map<S, T> merged = new HashMap<>(map1);
+ merged.putAll(map2);
+ return merged;
+ }
+
+ HostGroupStrategy getHostGroupStrategy() {
+ try {
+ return
injector.getInstance(serverConfig.getAddServiceHostGroupStrategyClass());
+ }
+ catch (ClassNotFoundException | ClassCastException |ConfigurationException
| ProvisionException ex) {
+ throw new IllegalStateException("Cannot load host group strategy", ex);
+ }
+ }
+
+}
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponseTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponseTest.java
new file mode 100644
index 0000000..80e25e5
--- /dev/null
+++
b/ambari-server/src/test/java/org/apache/ambari/server/api/services/stackadvisor/recommendations/RecommendationResponseTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.services.stackadvisor.recommendations;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class RecommendationResponseTest {
+
+ private final RecommendationResponse response = new RecommendationResponse();
+
+ @Before
+ public void setUp() {
+ RecommendationResponse.Blueprint blueprint = new
RecommendationResponse.Blueprint();
+ blueprint.setHostGroups(ImmutableSet.of(
+ hostGroup("host_group_1", "NAMENODE", "ZOOKEEPER_SERVER"),
+ hostGroup("host_group_2", "DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT")
+ ));
+
+ RecommendationResponse.BlueprintClusterBinding clusterBinding = new
RecommendationResponse.BlueprintClusterBinding();
+ clusterBinding.setHostGroups(ImmutableSet.of(
+ hostGroupBinding("host_group_1", "c7401", "c7402"),
+ hostGroupBinding("host_group_2", "c7403", "c7404", "c7405")
+ ));
+
+ RecommendationResponse.Recommendation recommendation = new
RecommendationResponse.Recommendation();
+ recommendation.setBlueprint(blueprint);
+ recommendation.setBlueprintClusterBinding(clusterBinding);
+
+ response.setRecommendations(recommendation);
+ }
+
+ @Test
+ public void blueprint_getHostgroupComponentMap() {
+ ImmutableMap<String, Set<String>> expected = ImmutableMap.of(
+ "host_group_1", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER"),
+ "host_group_2", ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"));
+ assertEquals(expected,
response.getRecommendations().getBlueprint().getHostgroupComponentMap());
+ }
+
+ @Test
+ public void hostgGroup_getComponentNames() {
+ Map<String, RecommendationResponse.HostGroup> hostGroups =
+ response.getRecommendations().getBlueprint().getHostGroups().stream()
+ .collect(toMap(RecommendationResponse.HostGroup::getName, identity()));
+ assertEquals(ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER"),
hostGroups.get("host_group_1").getComponentNames());
+ assertEquals(ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"), hostGroups.get("host_group_2").getComponentNames());
+ }
+
+ @Test
+ public void blueprintClusterBinding_getHostgroupHostMap() {
+ ImmutableMap<String, Set<String>> expected = ImmutableMap.of(
+ "host_group_1", ImmutableSet.of("c7401", "c7402"),
+ "host_group_2", ImmutableSet.of("c7403", "c7404", "c7405"));
+ assertEquals(expected,
response.getRecommendations().getBlueprintClusterBinding().getHostgroupHostMap());
+ }
+
+ private static final RecommendationResponse.HostGroup hostGroup(String name,
String... components) {
+ RecommendationResponse.HostGroup hostGroup = new
RecommendationResponse.HostGroup();
+ hostGroup.setName(name);
+ Set<Map<String, String>> hostGroupComponents =
+ Arrays.stream(components).map(comp -> ImmutableMap.of("name",
comp)).collect(toSet());
+ hostGroup.setComponents(hostGroupComponents);
+ return hostGroup;
+ }
+
+ private static final RecommendationResponse.BindingHostGroup
hostGroupBinding(String name, String... hosts) {
+ RecommendationResponse.BindingHostGroup hostGroup = new
RecommendationResponse.BindingHostGroup();
+ hostGroup.setName(name);
+ Set<Map<String, String>> hostGroupHosts =
+ Arrays.stream(hosts).map(host -> ImmutableMap.of("fqdn",
host)).collect(toSet());
+ hostGroup.setHosts(hostGroupHosts);
+ return hostGroup;
+ }
+}
\ No newline at end of file
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index e7cd271..d1a22f0 100644
---
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -10336,7 +10336,7 @@ public class BlueprintConfigurationProcessorTest
extends EasyMockSupport {
ClusterTopology topology = createNiceMock(ClusterTopology.class);
Stack stack = createNiceMock(Stack.class);
- Collection<String> services = ImmutableList.of("HDFS");
+ Set<String> services = ImmutableSet.of("HDFS");
expect(stack.getServices()).andReturn(services).anyTimes();
expect(stack.getConfiguration()).andReturn(stackConfig).anyTimes();
expect(topology.getConfiguration()).andReturn(clusterConfig).anyTimes();
@@ -10360,7 +10360,7 @@ public class BlueprintConfigurationProcessorTest
extends EasyMockSupport {
ClusterTopology topology = createNiceMock(ClusterTopology.class);
Stack stack = createNiceMock(Stack.class);
- Collection<String> services = ImmutableList.of("HDFS");
+ Set<String> services = ImmutableSet.of("HDFS");
expect(stack.getServices()).andReturn(services).anyTimes();
expect(stack.getConfiguration()).andReturn(stackConfig).anyTimes();
expect(topology.getConfiguration()).andReturn(clusterConfig).anyTimes();
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/HostGroupStrategyTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/HostGroupStrategyTest.java
new file mode 100644
index 0000000..2732ffa
--- /dev/null
+++
b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/HostGroupStrategyTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class HostGroupStrategyTest {
+
+ public static final Map<String, Set<String>> HOST_COMPONENTS =
ImmutableMap.<String, Set<String>>builder()
+ .put("c7401", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT"))
+ .put("c7402", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT"))
+ .put("c7403", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT"))
+ .put("c7404", ImmutableSet.of("ZOOKEEPER_SERVER, NAMENODE, HDFS_CLIENT,
SECONDARY_NAMENODE"))
+ .put("c7405", ImmutableSet.of("HIVE_SERVER, KAFKA_BROKER,
ZOOKEEPER_CLIENT"))
+ .put("c7406", ImmutableSet.of("DATANODE, HDFS_CLIENT, ZOOKEEPER_CLIENT"))
+ .put("c7407", ImmutableSet.of("DATANODE, HDFS_CLIENT, ZOOKEEPER_CLIENT"))
+ .put("c7408", ImmutableSet.of("DATANODE, HDFS_CLIENT, ZOOKEEPER_CLIENT"))
+ .build();
+
+ Map<String, Set<String>> HOST_GROUPS_FOR_EACH_HOST = ImmutableMap.<String,
Set<String>>builder()
+ .put("host_group_c7401", ImmutableSet.of("c7401"))
+ .put("host_group_c7402", ImmutableSet.of("c7402"))
+ .put("host_group_c7403", ImmutableSet.of("c7403"))
+ .put("host_group_c7404", ImmutableSet.of("c7404"))
+ .put("host_group_c7405", ImmutableSet.of("c7405"))
+ .put("host_group_c7406", ImmutableSet.of("c7406"))
+ .put("host_group_c7407", ImmutableSet.of("c7407"))
+ .put("host_group_c7408", ImmutableSet.of("c7408"))
+ .build();
+
+ Map<String, Set<String>> HOST_GROUPS_BY_COMPONENTS = ImmutableMap.of(
+ "host_group_1", ImmutableSet.of("c7401", "c7402", "c7403"),
+ "host_group_2", ImmutableSet.of("c7404"),
+ "host_group_3", ImmutableSet.of("c7405"),
+ "host_group_4", ImmutableSet.of("c7406", "c7407", "c7408")
+ );
+
+
+ @Test
+ public void hostGroupForEachHostStrategy() {
+ assertEquals(HOST_GROUPS_FOR_EACH_HOST, new
HostGroupForEachHostStrategy().calculateHostGroups(HOST_COMPONENTS));
+ }
+
+ @Test
+ public void groupByComponentsStrategy() {
+ assertEquals(HOST_GROUPS_BY_COMPONENTS, new
GroupByComponentsStrategy().calculateHostGroups(HOST_COMPONENTS));
+ }
+
+ @Test
+ public void autoHostgroupStrategy() {
+ assertEquals(HOST_GROUPS_FOR_EACH_HOST, new
AutoHostgroupStrategy().calculateHostGroups(HOST_COMPONENTS));
+ assertEquals(HOST_GROUPS_BY_COMPONENTS, new
AutoHostgroupStrategy(7).calculateHostGroups(HOST_COMPONENTS));
+ }
+}
\ No newline at end of file
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java
new file mode 100644
index 0000000..e652f78
--- /dev/null
+++
b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology.addservice;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.stream.Collectors.toMap;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorHelper;
+import
org.apache.ambari.server.api.services.stackadvisor.recommendations.RecommendationResponse;
+import
org.apache.ambari.server.api.services.stackadvisor.validations.ValidationResponse;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.StackId;
+import org.apache.commons.lang3.tuple.Pair;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.TestSubject;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+
+@RunWith(EasyMockRunner.class)
+public class StackAdvisorAdapterTest {
+
+ @Mock
+ private AmbariManagementController managementController;
+
+ @Mock
+ private StackAdvisorHelper stackAdvisorHelper;
+
+ @Mock
+ private Configuration serverConfig;
+
+ @Mock
+ private Injector injector;
+
+ @Mock
+ private Stack stack;
+
+ @TestSubject
+ private StackAdvisorAdapter adapter = new StackAdvisorAdapter();
+
+ private static final Map<String, Set<String>> COMPONENT_HOST_MAP =
ImmutableMap.<String, Set<String>>builder()
+ .put("NAMENODE", ImmutableSet.of("c7401", "c7402"))
+ .put("DATANODE", ImmutableSet.of("c7403", "c7404", "c7405", "c7406"))
+ .put("HDFS_CLIENT", ImmutableSet.of("c7403", "c7404", "c7405", "c7406"))
+ .put("ZOOKEEPER_SERVER", ImmutableSet.of("c7401", "c7402"))
+ .put("ZOOKEEPER_CLIENT", ImmutableSet.of("c7401", "c7402", "c7403",
"c7404", "c7405", "c7406"))
+ .build();
+
+ private static final Map<String, Map<String, Set<String>>>
SERVICE_COMPONENT_HOST_MAP_1 = ImmutableMap.of(
+ "HDFS", ImmutableMap.of(
+ "NAMENODE", ImmutableSet.of("c7401", "c7402"),
+ "DATANODE", ImmutableSet.of("c7403", "c7404", "c7405", "c7406"),
+ "HDFS_CLIENT", ImmutableSet.of("c7403", "c7404", "c7405", "c7406")),
+ "ZOOKEEPER", ImmutableMap.of(
+ "ZOOKEEPER_SERVER", ImmutableSet.of("c7401", "c7402"),
+ "ZOOKEEPER_CLIENT", ImmutableSet.of("c7401", "c7402", "c7403", "c7404",
"c7405", "c7406")));
+
+ private static final Map<String, Map<String, Set<String>>>
SERVICE_COMPONENT_HOST_MAP_2 = ImmutableMap.<String, Map<String,
Set<String>>>builder()
+ .putAll(SERVICE_COMPONENT_HOST_MAP_1)
+ .put("HIVE", emptyMap())
+ .put("SPARK2", emptyMap())
+ .build();
+
+ private static final Map<String, Set<String>> HOST_COMPONENT_MAP =
ImmutableMap.<String, Set<String>>builder()
+ .put("c7401", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER",
"ZOOKEEPER_CLIENT"))
+ .put("c7402", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER",
"ZOOKEEPER_CLIENT"))
+ .put("c7403", ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"))
+ .put("c7404", ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"))
+ .put("c7405", ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"))
+ .put("c7406", ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"))
+ .build();
+
+ @Test
+ public void getHostComponentMap() {
+ assertEquals(HOST_COMPONENT_MAP,
StackAdvisorAdapter.getHostComponentMap(COMPONENT_HOST_MAP));
+ }
+
+ @Test
+ public void getComponentHostMap() {
+ assertEquals(COMPONENT_HOST_MAP,
StackAdvisorAdapter.getComponentHostMap(SERVICE_COMPONENT_HOST_MAP_2));
+ }
+
+ @Test
+ public void getRecommendedLayout() {
+ Map<String, Set<String>> hostGroups = ImmutableMap.of(
+ "host_group1", ImmutableSet.of("c7401", "c7402"),
+ "host_group2", ImmutableSet.of("c7403", "c7404", "c7405", "c7406"));
+
+ Map<String, Set<String>> hostGroupComponents = ImmutableMap.of(
+ "host_group1", ImmutableSet.of("NAMENODE", "ZOOKEEPER_SERVER",
"ZOOKEEPER_CLIENT"),
+ "host_group2", ImmutableSet.of("DATANODE", "HDFS_CLIENT",
"ZOOKEEPER_CLIENT"));
+
+ Map<String, String> serviceToComponent = ImmutableMap.<String,
String>builder()
+ .put("NAMENODE", "HDFS")
+ .put("DATANODE", "HDFS")
+ .put("HDFS_CLIENT", "HDFS")
+ .put("ZOOKEEPER_SERVER", "ZOOKEEPER")
+ .put("ZOOKEEPER_CLIENT", "ZOOKEEPER")
+ .build();
+
+ assertEquals(SERVICE_COMPONENT_HOST_MAP_1,
+ StackAdvisorAdapter.getRecommendedLayout(hostGroups,
hostGroupComponents, serviceToComponent::get));
+ }
+
+ @Test
+ public void mergeDisjunctMaps() {
+ Map<String, String> map1 = ImmutableMap.of("key1", "value1", "key2",
"value2");
+ Map<String, String> map2 = ImmutableMap.of("key3", "value3", "key4",
"value4");
+ assertEquals(
+ ImmutableMap.of("key1", "value1", "key2", "value2", "key3", "value3",
"key4", "value4"),
+ StackAdvisorAdapter.mergeDisjunctMaps(map1, map2));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void mergeDisjunctMaps_invalidInput() {
+ Map<String, String> map1 = ImmutableMap.of("key1", "value1", "key2",
"value2");
+ Map<String, String> map2 = ImmutableMap.of("key2", "value2", "key3",
"value3");
+ StackAdvisorAdapter.mergeDisjunctMaps(map1, map2);
+ }
+
+ @Test
+ public void keepNewServicesOnly() {
+ Map<String, Map<String, Set<String>>> newServices = ImmutableMap.of(
+ "KAFKA", emptyMap(),
+ "PIG", emptyMap());
+
+ Map<String, Map<String, Set<String>>> recommendationForNewServices =
ImmutableMap.of(
+ "KAFKA", ImmutableMap.of("KAFKA_BROKER", ImmutableSet.of("c7405")),
+ "PIG", ImmutableMap.of("PIG_CLIENT", ImmutableSet.of("c7405", "c7406")));
+
+ Map<String, Map<String, Set<String>>> recommendations = new
HashMap<>(SERVICE_COMPONENT_HOST_MAP_1);
+ recommendations.putAll(recommendationForNewServices);
+
+ StackAdvisorAdapter.keepNewServicesOnly(recommendations, newServices);
+ assertEquals(recommendationForNewServices, recommendations);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Cluster cluster = mock(Cluster.class);
+ expect(cluster.getHostNames()).andReturn(ImmutableSet.of("c7401",
"c7402"));
+ expect(cluster.getServices()).andReturn(ImmutableMap.of(
+ "HDFS",
+ service("HDFS", ImmutableMap.of("NAMENODE", ImmutableSet.of("c7401"),
"HDFS_CLIENT", ImmutableSet.of("c7401", "c7402"))),
+ "ZOOKEEPER",
+ service("ZOOKEEPER", ImmutableMap.of("ZOOKEEPER_SERVER",
ImmutableSet.of("c7401"), "ZOOKEEPER_CLIENT", ImmutableSet.of("c7401",
"c7402")))));
+ Clusters clusters = mock(Clusters.class);
+ expect(clusters.getCluster(anyString())).andReturn(cluster).anyTimes();
+ expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+ replay(clusters, cluster, managementController);
+
+
expect(serverConfig.getGplLicenseAccepted()).andReturn(Boolean.FALSE).anyTimes();
+
expect(serverConfig.getAddServiceHostGroupStrategyClass()).andReturn((Class)GroupByComponentsStrategy.class).anyTimes();
+ replay(serverConfig);
+
+
expect(injector.getInstance(GroupByComponentsStrategy.class)).andReturn(new
GroupByComponentsStrategy()).anyTimes();
+ replay(injector);
+
+ RecommendationResponse response = new RecommendationResponse();
+ RecommendationResponse.Recommendation recommendation = new
RecommendationResponse.Recommendation();
+ response.setRecommendations(recommendation);
+ RecommendationResponse.BlueprintClusterBinding binding =
RecommendationResponse.BlueprintClusterBinding.fromHostGroupHostMap(
+ ImmutableMap.of(
+ "hostgroup-1", ImmutableSet.of("c7401"),
+ "hostgroup-2", ImmutableSet.of("c7402")));
+ recommendation.setBlueprintClusterBinding(binding);
+ RecommendationResponse.Blueprint blueprint = new
RecommendationResponse.Blueprint();
+
blueprint.setHostGroups(RecommendationResponse.HostGroup.fromHostGroupComponents(
+ ImmutableMap.of(
+ "hostgroup-1", ImmutableSet.of("NAMENODE", "HDFS_CLIENT",
"ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT"),
+ "hostgroup-2", ImmutableSet.of("HDFS_CLIENT", "ZOOKEEPER_CLIENT",
"KAFKA_BROKER"))
+ ));
+ recommendation.setBlueprint(blueprint);
+ expect(stackAdvisorHelper.recommend(anyObject())).andReturn(response);
+
+ ValidationResponse validationResponse = new ValidationResponse();
+ validationResponse.setItems(emptySet());
+
expect(stackAdvisorHelper.validate(anyObject())).andReturn(validationResponse);
+ replay(stackAdvisorHelper);
+
+ expect(stack.getStackId()).andReturn(new StackId("HDP", "3.0")).anyTimes();
+ ImmutableMap<String, String> serviceComponentMap = ImmutableMap.<String,
String>builder()
+ .put("KAFKA_BROKER", "KAFKA")
+ .put("NAMENODE", "HDFS")
+ .put("HDFS_CLIENT", "HDFS")
+ .put("ZOOKEEPER_SERVER", "ZOOKEEPER")
+ .put("ZOOKEEPER_CLIENT", "ZOOKEEPER")
+ .build();
+ expect(stack.getServiceForComponent(anyString())).andAnswer(() ->
serviceComponentMap.get(getCurrentArguments()[0])).anyTimes();
+ replay(stack);
+ }
+
+ private static Service service(String name,
ImmutableMap<String,ImmutableSet<String>> componentHostMap) {
+ Service service = mock(Service.class);
+ expect(service.getName()).andReturn(name).anyTimes();
+ Map<String, ServiceComponent> serviceComponents =
componentHostMap.entrySet().stream()
+ .map(entry -> {
+ ServiceComponent component = mock(ServiceComponent.class);
+ expect(component.getName()).andReturn(entry.getKey()).anyTimes();
+
expect(component.getServiceComponentsHosts()).andReturn(entry.getValue()).anyTimes();
+ replay(component);
+ return Pair.of(entry.getKey(), component);
+ })
+ .collect(toMap(Pair::getKey, Pair::getValue));
+
expect(service.getServiceComponents()).andReturn(serviceComponents).anyTimes();
+ replay(service);
+ return service;
+ }
+
+ @Test
+ public void recommendLayout() {
+ Map<String, Map<String, Set<String>>> newServices = ImmutableMap.of(
+ "KAFKA",
+ ImmutableMap.of("KAFKA_BROKER", emptySet()));
+
+ AddServiceInfo info = new AddServiceInfo(null, "c1", stack,
org.apache.ambari.server.topology.Configuration.newEmpty(), null, newServices);
+ AddServiceInfo infoWithRecommendations = adapter.recommendLayout(info);
+
+ Map<String, Map<String, Set<String>>> expectedNewLayout = ImmutableMap.of(
+ "KAFKA",
+ ImmutableMap.of("KAFKA_BROKER", ImmutableSet.of("c7402"))
+ );
+
+ assertEquals(expectedNewLayout, infoWithRecommendations.newServices());
+ }
+
+
+ private static Map<String, Map<String, Set<String>>> mutableCopy(Map<String,
Map<String, Set<String>>> map) {
+ Map<String, Map<String, Set<String>>> copy = new HashMap<>();
+ map.entrySet().forEach( outer -> {
+ Map<String, Set<String>> innerCopy = new HashMap<>(outer.getValue());
+ copy.put(outer.getKey(), innerCopy);
+ });
+ return copy;
+ }
+}
\ No newline at end of file