http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java index a6ceade..3fa40e6 100644 --- a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java +++ b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java @@ -1,22 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.pacemaker.codec; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; import org.apache.storm.DaemonConfig; import org.apache.storm.messaging.netty.ISaslServer; import org.apache.storm.messaging.netty.IServer; @@ -24,9 +22,6 @@ import org.apache.storm.messaging.netty.KerberosSaslServerHandler; import org.apache.storm.messaging.netty.SaslStormServerHandler; import org.apache.storm.messaging.netty.StormServerHandler; import org.apache.storm.security.auth.AuthUtils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Map; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; @@ -37,20 +32,14 @@ public class ThriftNettyServerCodec { public static final String SASL_HANDLER = "sasl-handler"; public static final String KERBEROS_HANDLER = "kerberos-handler"; - - public enum AuthMethod { - DIGEST, - KERBEROS, - NONE - }; - + private static final Logger LOG = LoggerFactory + .getLogger(ThriftNettyServerCodec.class); + + ; + private final int thriftMessageMaxSize; private IServer server; private AuthMethod authMethod; private Map<String, Object> topoConf; - private final int thriftMessageMaxSize; - - private static final Logger LOG = LoggerFactory - .getLogger(ThriftNettyServerCodec.class); public ThriftNettyServerCodec(IServer server, Map<String, Object> topoConf, AuthMethod authMethod, int thriftMessageMaxSizeBytes) { @@ -67,30 +56,26 @@ public class ThriftNettyServerCodec { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("encoder", new ThriftEncoder()); pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize)); - if(authMethod == AuthMethod.DIGEST) { + if (authMethod == AuthMethod.DIGEST) { try { LOG.debug("Adding SaslStormServerHandler to pacemaker server pipeline."); - pipeline.addLast(SASL_HANDLER, new SaslStormServerHandler((ISaslServer)server)); - } - catch (IOException e) { + pipeline.addLast(SASL_HANDLER, new SaslStormServerHandler((ISaslServer) server)); + } catch (IOException e) { throw new RuntimeException(e); } - } - else if(authMethod == AuthMethod.KERBEROS) { + } else if (authMethod == AuthMethod.KERBEROS) { try { LOG.debug("Adding KerberosSaslServerHandler to pacemaker server pipeline."); ArrayList<String> authorizedUsers = new ArrayList(1); - authorizedUsers.add((String)topoConf.get(DaemonConfig.NIMBUS_DAEMON_USER)); - pipeline.addLast(KERBEROS_HANDLER, new KerberosSaslServerHandler((ISaslServer)server, + authorizedUsers.add((String) topoConf.get(DaemonConfig.NIMBUS_DAEMON_USER)); + pipeline.addLast(KERBEROS_HANDLER, new KerberosSaslServerHandler((ISaslServer) server, topoConf, AuthUtils.LOGIN_CONTEXT_PACEMAKER_SERVER, authorizedUsers)); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } - } - else if(authMethod == AuthMethod.NONE) { + } else if (authMethod == AuthMethod.NONE) { LOG.debug("Not authenticating any clients. AuthMethod is NONE"); } @@ -99,4 +84,10 @@ public class ThriftNettyServerCodec { } }; } + + public enum AuthMethod { + DIGEST, + KERBEROS, + NONE + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java index c5a5ee5..bc08579 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; - import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.daemon.nimbus.TopologyResources; @@ -49,40 +48,33 @@ import org.slf4j.LoggerFactory; public class Cluster implements ISchedulingState { private static final Logger LOG = LoggerFactory.getLogger(Cluster.class); - private SchedulerAssignmentImpl assignment; - /** * key: supervisor id, value: supervisor details. */ private final Map<String, SupervisorDetails> supervisors = new HashMap<>(); - /** * key: rack, value: nodes in that rack. */ private final Map<String, List<String>> networkTopography = new HashMap<>(); - /** * key: topologyId, value: topology's current assignments. */ private final Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>(); - /** * key topologyId, Value: scheduler's status. */ private final Map<String, String> status = new HashMap<>(); - /** * A map from hostname to supervisor ids. */ private final Map<String, List<String>> hostToId = new HashMap<>(); - private final Map<String, Object> conf; - - private Set<String> blackListedHosts = new HashSet<>(); - private INimbus inimbus; private final Topologies topologies; private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache; private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache; + private SchedulerAssignmentImpl assignment; + private Set<String> blackListedHosts = new HashSet<>(); + private INimbus inimbus; public Cluster( INimbus nimbus, @@ -111,7 +103,7 @@ public class Cluster implements ISchedulingState { /** * Testing Constructor that takes an existing cluster and replaces the topologies in it. * - * @param src the original cluster + * @param src the original cluster * @param topologies the new topolgoies to use */ @VisibleForTesting @@ -195,8 +187,61 @@ public class Cluster implements ISchedulingState { } /** - * Check if the given topology is allowed for modification right now. If not throw an - * IllegalArgumentException else go on. + * Get heap memory usage for a worker's main process and logwriter process. + * + * @param topConf - the topology config + * @return the assigned memory (in MB) + */ + public static double getAssignedMemoryForSlot(final Map<String, Object> topConf) { + double totalWorkerMemory = 0.0; + final Integer topologyWorkerDefaultMemoryAllocation = 768; + + List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList( + Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf); + List<String> workerGcChildopts = ConfigUtils.getValueAsList( + Config.WORKER_GC_CHILDOPTS, topConf); + Double memGcChildopts = null; + memGcChildopts = Utils.parseJvmHeapMemByChildOpts( + topologyWorkerGcChildopts, null); + if (memGcChildopts == null) { + memGcChildopts = Utils.parseJvmHeapMemByChildOpts( + workerGcChildopts, null); + } + + List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList( + Config.TOPOLOGY_WORKER_CHILDOPTS, topConf); + Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts( + topologyWorkerChildopts, null); + + List<String> workerChildopts = ConfigUtils.getValueAsList( + Config.WORKER_CHILDOPTS, topConf); + Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts( + workerChildopts, null); + + if (memGcChildopts != null) { + totalWorkerMemory += memGcChildopts; + } else if (memTopologyWorkerChildopts != null) { + totalWorkerMemory += memTopologyWorkerChildopts; + } else if (memWorkerChildopts != null) { + totalWorkerMemory += memWorkerChildopts; + } else { + Object workerHeapMemoryMb = topConf.get( + Config.WORKER_HEAP_MEMORY_MB); + totalWorkerMemory += ObjectReader.getInt( + workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation); + } + + List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList( + Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf); + if (topoWorkerLwChildopts != null) { + totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts( + topoWorkerLwChildopts, 0.0); + } + return totalWorkerMemory; + } + + /** + * Check if the given topology is allowed for modification right now. If not throw an IllegalArgumentException else go on. * * @param topologyId the id of the topology to check */ @@ -204,6 +249,16 @@ public class Cluster implements ISchedulingState { //NOOP } + @Override + public Topologies getTopologies() { + return topologies; + } + + @Override + public Set<String> getBlacklistedHosts() { + return blackListedHosts; + } + /** * Set the list of hosts that are blacklisted. * @@ -218,16 +273,6 @@ public class Cluster implements ISchedulingState { blackListedHosts.addAll(hosts); } - @Override - public Topologies getTopologies() { - return topologies; - } - - @Override - public Set<String> getBlacklistedHosts() { - return blackListedHosts; - } - public void blacklistHost(String host) { blackListedHosts.add(host); } @@ -304,7 +349,8 @@ public class Cluster implements ISchedulingState { @Override public Set<Integer> getUsedPorts(SupervisorDetails supervisor) { - return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), (x) -> new HashSet<>()).stream().map(WorkerSlot::getPort).collect(Collectors.toSet()); + return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), (x) -> new HashSet<>()).stream().map(WorkerSlot::getPort) + .collect(Collectors.toSet()); } @Override @@ -424,12 +470,12 @@ public class Cluster implements ISchedulingState { totalResources.addOnHeap(shared.get_off_heap_worker()); addResource( - sharedTotalResources, - Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, shared.get_off_heap_worker() + sharedTotalResources, + Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, shared.get_off_heap_worker() ); addResource( - sharedTotalResources, - Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap() + sharedTotalResources, + Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap() ); } sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources); @@ -441,12 +487,12 @@ public class Cluster implements ISchedulingState { ret.set_mem_off_heap(totalResources.getOffHeapMemoryMb()); ret.set_mem_on_heap(totalResources.getOnHeapMemoryMb()); ret.set_shared_mem_off_heap( - sharedTotalResources.getOrDefault( - Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0) + sharedTotalResources.getOrDefault( + Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0) ); ret.set_shared_mem_on_heap( - sharedTotalResources.getOrDefault( - Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0) + sharedTotalResources.getOrDefault( + Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0) ); return ret; } @@ -493,22 +539,22 @@ public class Cluster implements ISchedulingState { if (memoryAdded > memoryAvailable) { if (LOG.isTraceEnabled()) { LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", - td.getName(), - exec, - ws, - memoryAdded, - memoryAvailable); + td.getName(), + exec, + ws, + memoryAdded, + memoryAvailable); } return false; } if (afterOnHeap > maxHeap) { if (LOG.isTraceEnabled()) { LOG.trace("Could not schedule {}:{} on {} HEAP would be too large {} > {}", - td.getName(), - exec, - ws, - afterOnHeap, - maxHeap); + td.getName(), + exec, + ws, + afterOnHeap, + maxHeap); } return false; } @@ -531,9 +577,9 @@ public class Cluster implements ISchedulingState { if (td == null) { throw new IllegalArgumentException( "Trying to schedule for topo " - + topologyId - + " but that is not a known topology " - + topologies.getAllIds()); + + topologyId + + " but that is not a known topology " + + topologies.getAllIds()); } WorkerResources resources = calculateWorkerResources(td, executors); SchedulerAssignmentImpl assignment = assignments.get(topologyId); @@ -545,14 +591,14 @@ public class Cluster implements ISchedulingState { if (assignment.isExecutorAssigned(executor)) { throw new RuntimeException( "Attempting to assign executor: " - + executor - + " of topology: " - + topologyId - + " to workerslot: " - + slot - + ". The executor is already assigned to workerslot: " - + assignment.getExecutorToSlot().get(executor) - + ". The executor must unassigned before it can be assigned to another slot!"); + + executor + + " of topology: " + + topologyId + + " to workerslot: " + + slot + + ". The executor is already assigned to workerslot: " + + assignment.getExecutorToSlot().get(executor) + + ". The executor must unassigned before it can be assigned to another slot!"); } } } @@ -587,7 +633,7 @@ public class Cluster implements ISchedulingState { /** * Calculate the amount of shared off heap memory on a given nodes with the given assignment. * - * @param nodeId the id of the node + * @param nodeId the id of the node * @param assignment the current assignment * @return the amount of shared off heap memory for that node in MB */ @@ -759,59 +805,6 @@ public class Cluster implements ISchedulingState { } /** - * Get heap memory usage for a worker's main process and logwriter process. - * @param topConf - the topology config - * @return the assigned memory (in MB) - */ - public static double getAssignedMemoryForSlot(final Map<String, Object> topConf) { - double totalWorkerMemory = 0.0; - final Integer topologyWorkerDefaultMemoryAllocation = 768; - - List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList( - Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf); - List<String> workerGcChildopts = ConfigUtils.getValueAsList( - Config.WORKER_GC_CHILDOPTS, topConf); - Double memGcChildopts = null; - memGcChildopts = Utils.parseJvmHeapMemByChildOpts( - topologyWorkerGcChildopts, null); - if (memGcChildopts == null) { - memGcChildopts = Utils.parseJvmHeapMemByChildOpts( - workerGcChildopts, null); - } - - List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList( - Config.TOPOLOGY_WORKER_CHILDOPTS, topConf); - Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts( - topologyWorkerChildopts, null); - - List<String> workerChildopts = ConfigUtils.getValueAsList( - Config.WORKER_CHILDOPTS, topConf); - Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts( - workerChildopts, null); - - if (memGcChildopts != null) { - totalWorkerMemory += memGcChildopts; - } else if (memTopologyWorkerChildopts != null) { - totalWorkerMemory += memTopologyWorkerChildopts; - } else if (memWorkerChildopts != null) { - totalWorkerMemory += memWorkerChildopts; - } else { - Object workerHeapMemoryMb = topConf.get( - Config.WORKER_HEAP_MEMORY_MB); - totalWorkerMemory += ObjectReader.getInt( - workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation); - } - - List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList( - Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf); - if (topoWorkerLwChildopts != null) { - totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts( - topoWorkerLwChildopts, 0.0); - } - return totalWorkerMemory; - } - - /** * set scheduler status for a topology. */ public void setStatus(TopologyDetails td, String statusMessage) { @@ -837,10 +830,6 @@ public class Cluster implements ISchedulingState { return status; } - public String getStatus(String topoId) { - return status.get(topoId); - } - /** * set scheduler status map. */ @@ -858,6 +847,10 @@ public class Cluster implements ISchedulingState { this.status.putAll(statusMap); } + public String getStatus(String topoId) { + return status.get(topoId); + } + @Override public Map<String, TopologyResources> getTopologyResourcesMap() { Map<String, TopologyResources> ret = new HashMap<>(assignments.size()); @@ -925,7 +918,7 @@ public class Cluster implements ISchedulingState { /** - * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot + * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot * * @param workerSlot * @param workerResources http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/Component.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java index 6a0de72..c4a28c3 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java @@ -21,7 +21,6 @@ package org.apache.storm.scheduler; import java.util.HashSet; import java.util.List; import java.util.Set; - import org.apache.storm.generated.ComponentType; public class Component { @@ -33,9 +32,10 @@ public class Component { /** * Create a new component. - * @param type the type of component this is + * + * @param type the type of component this is * @param compId the id of the component - * @param execs the executors for this component. + * @param execs the executors for this component. */ public Component(ComponentType type, String compId, List<ExecutorDetails> execs) { this.type = type; @@ -76,13 +76,13 @@ public class Component { @Override public String toString() { return "{id: " - + getId() - + " Parents: " - + getParents() - + " Children: " - + getChildren() - + " Execs: " - + getExecs() - + "}"; + + getId() + + " Parents: " + + getParents() + + " Children: " + + getChildren() + + " Execs: " + + getExecs() + + "}"; } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java index 82630ca..84498a3 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.scheduler; import java.util.HashMap; @@ -23,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - import org.apache.storm.utils.Utils; public class DefaultScheduler implements IScheduler { @@ -44,7 +44,7 @@ public class DefaultScheduler implements IScheduler { } for (WorkerSlot slot : slots) { - existingSlots.remove(slot); + existingSlots.remove(slot); } return existingSlots.keySet(); @@ -74,7 +74,8 @@ public class DefaultScheduler implements IScheduler { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(); Set<ExecutorDetails> allExecutors = topology.getExecutors(); - Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); + Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = + EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); for (List<ExecutorDetails> list : aliveAssigned.values()) { aliveExecutors.addAll(list); @@ -85,7 +86,7 @@ public class DefaultScheduler implements IScheduler { Set<WorkerSlot> badSlots = null; if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) { - badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); + badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); } if (badSlots != null) { cluster.freeSlots(badSlots); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java index 9cbce18..f4ac258 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java @@ -20,7 +20,6 @@ package org.apache.storm.scheduler; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -30,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; @@ -116,7 +114,7 @@ public class EvenScheduler implements IScheduler { //allow requesting slots number bigger than available slots int toIndex = (totalSlotsToUse - aliveAssigned.size()) - > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size()); + > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size()); List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex); Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java index bbbbf3f..855cc96 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java @@ -40,8 +40,8 @@ public class ExecutorDetails { if (other == null || !(other instanceof ExecutorDetails)) { return false; } - - ExecutorDetails executor = (ExecutorDetails)other; + + ExecutorDetails executor = (ExecutorDetails) other; return (this.startTask == executor.startTask) && (this.endTask == executor.endTask); } @@ -49,7 +49,7 @@ public class ExecutorDetails { public int hashCode() { return this.startTask + 13 * this.endTask; } - + @Override public String toString() { return "[" + this.startTask + ", " + this.endTask + "]"; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java index db2fed9..4e26708 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler; @@ -42,6 +36,6 @@ public interface INimbus { * map from node id to supervisor details. */ String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId); - - IScheduler getForcedScheduler(); + + IScheduler getForcedScheduler(); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java index 5b79f11..18e9afd 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler; @@ -21,9 +15,9 @@ package org.apache.storm.scheduler; import java.util.Map; public interface IScheduler { - + void prepare(Map<String, Object> conf); - + /** * Set assignments for the topologies which needs scheduling. The new assignments is available * through `cluster.getAssignments()` http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java index 187a03c..b0d94b5 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java @@ -27,17 +27,21 @@ import org.apache.storm.generated.WorkerResources; import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer; import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest; -/** An interface that provides access to the current scheduling state. */ +/** + * An interface that provides access to the current scheduling state. + */ public interface ISchedulingState { /** * Get all of the topologies. + * * @return all of the topologies that are a part of the cluster. */ Topologies getTopologies(); /** * Get all of the topologies that need scheduling. + * * @return all of the topologies that are not fully scheduled. */ List<TopologyDetails> needsSchedulingTopologies(); @@ -48,16 +52,16 @@ public interface ISchedulingState { * <p>A topology needs scheduling if one of the following conditions holds: * * <ul> - * <li>Although the topology is assigned slots, but is squeezed. i.e. the topology is assigned - * less slots than desired. - * <li>There are unassigned executors in this topology + * <li>Although the topology is assigned slots, but is squeezed. i.e. the topology is assigned + * less slots than desired. + * <li>There are unassigned executors in this topology * </ul> */ boolean needsScheduling(TopologyDetails topology); /** - * Like {@link #needsScheduling(TopologyDetails)} but does not take into account the number of - * workers requested. This is because the number of workers is ignored in RAS + * Like {@link #needsScheduling(TopologyDetails)} but does not take into account the number of workers requested. This is because the + * number of workers is ignored in RAS * * @param topology the topology to check * @return true if the topology needs scheduling else false. @@ -66,6 +70,7 @@ public interface ISchedulingState { /** * Get all of the hosts that are blacklisted. + * * @return all of the hosts that are blacklisted */ Set<String> getBlacklistedHosts(); @@ -104,6 +109,7 @@ public interface ISchedulingState { /** * Get the executor to component name map for executors that need to be scheduled. + * * @param topology the topology this is for * @return a executor -> component-id map which needs scheduling in this topology. */ @@ -111,16 +117,21 @@ public interface ISchedulingState { /** * Get the component name to executor list for executors that need to be scheduled. + * * @param topology the topology this is for * @return a component-id -> executors map which needs scheduling in this topology. */ Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors( TopologyDetails topology); - /** Get all the used ports of this supervisor. */ + /** + * Get all the used ports of this supervisor. + */ Set<Integer> getUsedPorts(SupervisorDetails supervisor); - /** Return the available ports of this supervisor. */ + /** + * Return the available ports of this supervisor. + */ Set<Integer> getAvailablePorts(SupervisorDetails supervisor); /** @@ -131,10 +142,14 @@ public interface ISchedulingState { */ Set<Integer> getAssignablePorts(SupervisorDetails supervisor); - /** Return all the available slots on this supervisor. */ + /** + * Return all the available slots on this supervisor. + */ List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor); - /** Get all the available worker slots in the cluster. */ + /** + * Get all the available worker slots in the cluster. + */ List<WorkerSlot> getAvailableSlots(); /** @@ -145,14 +160,19 @@ public interface ISchedulingState { */ List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor); - /** Get all non-blacklisted slots in the cluster. */ + /** + * Get all non-blacklisted slots in the cluster. + */ List<WorkerSlot> getAssignableSlots(); - /** Get all currently occupied slots. */ + /** + * Get all currently occupied slots. + */ Collection<WorkerSlot> getUsedSlots(); /** * Check if a slot is occupied or not. + * * @param slot the slot be to checked. * @return true if the specified slot is occupied. */ @@ -160,20 +180,20 @@ public interface ISchedulingState { /** * Get the number of workers assigned to a topology. + * * @param topology the topology this is for * @return the number of workers assigned to this topology. */ int getAssignedNumWorkers(TopologyDetails topology); /** - * Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable - * and cpu added <= cpuAvailable. + * Would scheduling exec on ws fit? With a heap <= maxHeap total memory added <= memoryAvailable and cpu added <= cpuAvailable. * - * @param ws the slot to put it in - * @param exec the executor to investigate - * @param td the topology detains for this executor + * @param ws the slot to put it in + * @param exec the executor to investigate + * @param td the topology detains for this executor * @param resourcesAvailable all the available resources - * @param maxHeap the maximum heap size for ws + * @param maxHeap the maximum heap size for ws * @return true it fits else false */ boolean wouldFit( @@ -183,13 +203,19 @@ public interface ISchedulingState { NormalizedResourceOffer resourcesAvailable, double maxHeap); - /** get the current assignment for the topology. */ + /** + * get the current assignment for the topology. + */ SchedulerAssignment getAssignmentById(String topologyId); - /** get slots used by a topology. */ + /** + * get slots used by a topology. + */ Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId); - /** Get a specific supervisor with the <code>nodeId</code>. */ + /** + * Get a specific supervisor with the <code>nodeId</code>. + */ SupervisorDetails getSupervisorById(String nodeId); /** @@ -200,45 +226,55 @@ public interface ISchedulingState { */ List<SupervisorDetails> getSupervisorsByHost(String host); - /** Get all the assignments. */ + /** + * Get all the assignments. + */ Map<String, SchedulerAssignment> getAssignments(); - /** Get all the supervisors. */ + /** + * Get all the supervisors. + */ Map<String, SupervisorDetails> getSupervisors(); - /** Get all scheduled resources for node. **/ + /** + * Get all scheduled resources for node. + **/ NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId); - /** Get the total amount of CPU resources in cluster. */ + /** + * Get the total amount of CPU resources in cluster. + */ double getClusterTotalCpuResource(); - /** Get the total amount of memory resources in cluster. */ + /** + * Get the total amount of memory resources in cluster. + */ double getClusterTotalMemoryResource(); - /** Get the network topography (rackId -> nodes in the rack). */ + /** + * Get the network topography (rackId -> nodes in the rack). + */ Map<String, List<String>> getNetworkTopography(); - /** Get all topology scheduler statuses. */ + /** + * Get all topology scheduler statuses. + */ Map<String, String> getStatusMap(); /** - * Get the amount of resources used by topologies. Used for displaying resource information on the - * UI. + * Get the amount of resources used by topologies. Used for displaying resource information on the UI. * - * @return a map that contains multiple topologies and the resources the topology requested and - * assigned. Key: topology id Value: an array that describes the resources the topology - * requested and assigned in the following format: {requestedMemOnHeap, requestedMemOffHeap, - * requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu} + * @return a map that contains multiple topologies and the resources the topology requested and assigned. Key: topology id Value: an + * array that describes the resources the topology requested and assigned in the following format: {requestedMemOnHeap, + * requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu} */ Map<String, TopologyResources> getTopologyResourcesMap(); /** - * Get the amount of used and free resources on a supervisor. Used for displaying resource - * information on the UI + * Get the amount of used and free resources on a supervisor. Used for displaying resource information on the UI * - * @return a map where the key is the supervisor id and the value is a map that represents - * resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem, - * usedCpu} + * @return a map where the key is the supervisor id and the value is a map that represents resource usage for a supervisor in the + * following format: {totalMem, totalCpu, usedMem, usedCpu} */ Map<String, SupervisorResources> getSupervisorsResourcesMap(); @@ -273,6 +309,8 @@ public interface ISchedulingState { */ double getScheduledCpuForNode(String nodeId); - /** Get the nimbus configuration. */ + /** + * Get the nimbus configuration. + */ Map<String, Object> getConf(); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java index 4ecf28e..08d1229 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler; @@ -25,6 +19,7 @@ public interface ISupervisor { void prepare(Map<String, Object> topoConf, String schedulerLocalDir); // for mesos, this is {hostname}-{topologyid} + /** * The id used for writing metadata into ZK. */ @@ -39,7 +34,7 @@ public interface ISupervisor { String getAssignmentId(); Object getMetadata(); - + boolean confirmAssigned(int port); // calls this before actually killing the worker locally... http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java index 3201a40..03c0c6a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.scheduler; import java.util.ArrayList; @@ -28,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.commons.lang.Validate; import org.apache.storm.DaemonConfig; import org.apache.storm.utils.Utils; @@ -63,7 +57,8 @@ public class IsolationScheduler implements IScheduler { // get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned) // will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors]) - // match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time) + // match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots + // (do one topology at a time) // blacklist all machines who had production slots defined // log isolated topologies who weren't able to get enough slots / machines // run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines @@ -85,9 +80,9 @@ public class IsolationScheduler implements IScheduler { int numWorkers = assignments.size(); if (isoIds.contains(topologyId) - && checkAssignmentTopology(assignments, topologyId) - && distribution.containsKey(numWorkers) - && checkAssignmentWorkerSpecs(assignments, workerSpecs)) { + && checkAssignmentTopology(assignments, topologyId) + && distribution.containsKey(numWorkers) + && checkAssignmentWorkerSpecs(assignments, workerSpecs)) { decrementDistribution(distribution, numWorkers); for (AssignmentInfo ass : assignments) { workerSpecs.remove(ass.getExecutors()); @@ -127,9 +122,9 @@ public class IsolationScheduler implements IScheduler { List<String> failedTopologyIds = extractFailedTopologyIds(topologyWorkerSpecs); if (failedTopologyIds.size() > 0) { LOG.warn("Unable to isolate topologies " + failedTopologyIds - + ". No machine had enough worker slots to run the remaining workers for these topologies. " - + "Clearing all other resources and will wait for enough resources for " - + "isolated topologies before allocating any other resources."); + + ". No machine had enough worker slots to run the remaining workers for these topologies. " + + "Clearing all other resources and will wait for enough resources for " + + "isolated topologies before allocating any other resources."); // clear workers off all hosts that are not blacklisted Map<String, Set<WorkerSlot>> usedSlots = hostToUsedSlots(cluster); Set<Map.Entry<String, Set<WorkerSlot>>> entries = usedSlots.entrySet(); @@ -176,7 +171,7 @@ public class IsolationScheduler implements IScheduler { private List<String> extractFailedTopologyIds(Map<String, Set<Set<ExecutorDetails>>> isoTopologyWorkerSpecs) { List<String> failedTopologyIds = new ArrayList<String>(); - for (Map.Entry<String, Set<Set<ExecutorDetails>>> topoWorkerSpecsEntry : isoTopologyWorkerSpecs.entrySet()){ + for (Map.Entry<String, Set<Set<ExecutorDetails>>> topoWorkerSpecsEntry : isoTopologyWorkerSpecs.entrySet()) { Set<Set<ExecutorDetails>> workerSpecs = topoWorkerSpecsEntry.getValue(); if (workerSpecs != null && !workerSpecs.isEmpty()) { failedTopologyIds.add(topoWorkerSpecsEntry.getKey()); @@ -195,7 +190,7 @@ public class IsolationScheduler implements IScheduler { } private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) { - Collection<SchedulerAssignment> assignmentValues = cluster.getAssignments().values(); + Collection<SchedulerAssignment> assignmentValues = cluster.getAssignments().values(); Map<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>(); for (SchedulerAssignment sa : assignmentValues) { @@ -237,7 +232,7 @@ public class IsolationScheduler implements IScheduler { bucketExecutors.put(bucketIndex, executors); } executors.add(executor); - bucketIndex = (bucketIndex+1) % numWorkers; + bucketIndex = (bucketIndex + 1) % numWorkers; } return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values()); @@ -325,11 +320,11 @@ public class IsolationScheduler implements IScheduler { sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue())); } Collections.sort(sortHostAssignSlots, new Comparator<HostAssignableSlots>() { - @Override - public int compare(HostAssignableSlots o1, HostAssignableSlots o2) { - return o2.getWorkerSlots().size() - o1.getWorkerSlots().size(); - } - }); + @Override + public int compare(HostAssignableSlots o1, HostAssignableSlots o2) { + return o2.getWorkerSlots().size() - o1.getWorkerSlots().size(); + } + }); Collections.shuffle(sortHostAssignSlots); return new LinkedList<HostAssignableSlots>(sortHostAssignSlots); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java index c16a67f..347c95f 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java @@ -21,13 +21,12 @@ package org.apache.storm.scheduler; import java.util.Collection; import java.util.Map; import java.util.Set; - -import org.apache.storm.generated.Assignment; import org.apache.storm.generated.WorkerResources; public interface SchedulerAssignment { /** * Is this slot part of this assignment or not. + * * @param slot the slot to check. * @return true if the slot is occupied by this assignment else false. */ @@ -35,50 +34,57 @@ public interface SchedulerAssignment { /** * Is the executor assigned or not. - * + * * @param executor the executor to check it if is assigned. * @return true if it is assigned else false */ public boolean isExecutorAssigned(ExecutorDetails executor); - + /** * Return the ID of the topology. + * * @return the topology-id this assignment is for. */ public String getTopologyId(); /** * Get the map of executor to WorkerSlot. + * * @return the executor -> slot map. */ public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot(); /** * Get the set of all executors. - * @return the executors covered by this assignments + * + * @return the executors covered by this assignments */ public Set<ExecutorDetails> getExecutors(); /** * Get the set of all slots that are a part of this. + * * @return the set of all slots. */ public Set<WorkerSlot> getSlots(); /** * Get the mapping of slot to executors on that slot. + * * @return the slot to the executors assigned to that slot. */ public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors(); - + /** * Get the slot to resource mapping. + * * @return The slot to resource mapping */ public Map<WorkerSlot, WorkerResources> getScheduledResources(); - + /** * Get the total shared off heap memory mapping. + * * @return host to total shared off heap memory mapping. */ public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java index f92c3d1..dcb2ebc 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.storm.generated.WorkerResources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,14 +49,15 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { /** * Create a new assignment. - * @param topologyId the id of the topology the assignment is for. - * @param executorToSlot the executor to slot mapping for the assignment. Can be null and set through other methods later. - * @param resources the resources for the current assignments. Can be null and set through other methods later. + * + * @param topologyId the id of the topology the assignment is for. + * @param executorToSlot the executor to slot mapping for the assignment. Can be null and set through other methods later. + * @param resources the resources for the current assignments. Can be null and set through other methods later. * @param nodeIdToTotalSharedOffHeap the shared memory for this assignment can be null and set through other methods later. */ public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlot, - Map<WorkerSlot, WorkerResources> resources, Map<String, Double> nodeIdToTotalSharedOffHeap) { - this.topologyId = topologyId; + Map<WorkerSlot, WorkerResources> resources, Map<String, Double> nodeIdToTotalSharedOffHeap) { + this.topologyId = topologyId; if (executorToSlot != null) { if (executorToSlot.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) { throw new RuntimeException("Cannot create a scheduling with a null in it " + executorToSlot); @@ -83,8 +83,8 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { } public SchedulerAssignmentImpl(SchedulerAssignment assignment) { - this(assignment.getTopologyId(), assignment.getExecutorToSlot(), - assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapMemory()); + this(assignment.getTopologyId(), assignment.getExecutorToSlot(), + assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapMemory()); } @Override @@ -94,6 +94,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { /** * Like the equals command, but ignores the resources. + * * @param other the object to check for equality against. * @return true if they are equal, ignoring resources, else false. */ @@ -105,11 +106,11 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { return false; } SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other; - + return topologyId.equals(o.topologyId) - && executorToSlot.equals(o.executorToSlot); + && executorToSlot.equals(o.executorToSlot); } - + @Override public int hashCode() { final int prime = 31; @@ -118,7 +119,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { result = prime * result + ((executorToSlot == null) ? 0 : executorToSlot.hashCode()); return result; } - + @Override public boolean equals(Object other) { if (!equalsIgnoreResources(other)) { @@ -127,13 +128,13 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other; return resources.equals(o.resources) - && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap); + && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap); } - + @Override public Set<WorkerSlot> getSlots() { return new HashSet<>(executorToSlot.values()); - } + } @Deprecated public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) { @@ -180,7 +181,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { String node = slot.getNodeId(); boolean isFound = false; - for (WorkerSlot ws: executorToSlot.values()) { + for (WorkerSlot ws : executorToSlot.values()) { if (node.equals(ws.getNodeId())) { isFound = true; break; @@ -243,7 +244,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { public void setTotalSharedOffHeapMemory(String node, double value) { nodeIdToTotalSharedOffHeap.put(node, value); } - + @Override public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory() { return nodeIdToTotalSharedOffHeap; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java index ee8ef00..ff020a4 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java @@ -27,7 +27,7 @@ public class SingleTopologyCluster extends Cluster { /** * Create a new cluster that only allows modifications to a single topology. * - * @param other the current cluster to base this off of + * @param other the current cluster to base this off of * @param topologyId the topology that is allowed to be modified. */ public SingleTopologyCluster(Cluster other, String topologyId) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java index 00b8ea0..4ac1057 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler; @@ -22,7 +16,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.storm.Constants; import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +38,13 @@ public class SupervisorDetails { */ private final Object schedulerMeta; /** - * all the ports of the supervisor. - */ - private Set<Integer> allPorts; - /** * Map containing a manifest of resources for the node the supervisor resides. */ private final NormalizedResourceOffer totalResources; + /** + * all the ports of the supervisor. + */ + private Set<Integer> allPorts; /** * Create the details of a new supervisor. @@ -105,20 +98,11 @@ public class SupervisorDetails { Collection<? extends Number> allPorts, Map<String, Double> totalResources) { this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources); } - + @Override public String toString() { return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + " META: " + meta - + " SCHED_META: " + schedulerMeta + " PORTS: " + allPorts; - } - - private void setAllPorts(Collection<? extends Number> allPorts) { - this.allPorts = new HashSet<>(); - if (allPorts != null) { - for (Number n: allPorts) { - this.allPorts.add(n.intValue()); - } - } + + " SCHED_META: " + schedulerMeta + " PORTS: " + allPorts; } public String getId() { @@ -136,11 +120,20 @@ public class SupervisorDetails { public Object getMeta() { return meta; } - + public Set<Integer> getAllPorts() { return allPorts; } + private void setAllPorts(Collection<? extends Number> allPorts) { + this.allPorts = new HashSet<>(); + if (allPorts != null) { + for (Number n : allPorts) { + this.allPorts.add(n.intValue()); + } + } + } + public Object getSchedulerMeta() { return this.schedulerMeta; } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java index ac2a506..8ee686f 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java @@ -31,8 +31,8 @@ public class SupervisorResources { * * @param totalMem the total mem on the supervisor * @param totalCpu the total CPU on the supervisor - * @param usedMem the used mem on the supervisor - * @param usedCpu the used CPU on the supervisor + * @param usedMem the used mem on the supervisor + * @param usedCpu the used CPU on the supervisor */ public SupervisorResources(double totalMem, double totalCpu, double usedMem, double usedCpu) { this.totalMem = totalMem; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java index 16d230a..b222916 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java @@ -30,19 +30,9 @@ public class Topologies implements Iterable<TopologyDetails> { Map<String, String> nameToId; Map<String, Map<String, Component>> allComponents; - private static Map<String, TopologyDetails> mkMap(TopologyDetails[] details) { - Map<String, TopologyDetails> ret = new HashMap<>(); - for (TopologyDetails td : details) { - if (ret.put(td.getId(), td) != null) { - throw new IllegalArgumentException( - "Cannot have multiple topologies with the id " + td.getId()); - } - } - return ret; - } - /** * Create a new Topologies from a list of TopologyDetails. + * * @param details the list of details to use. * @throws IllegalArgumentException if duplicate topology ids are found. */ @@ -52,6 +42,7 @@ public class Topologies implements Iterable<TopologyDetails> { /** * Create a new Topologies from a map of id to topology + * * @param topologies a map of topology id to topology details. */ public Topologies(Map<String, TopologyDetails> topologies) { @@ -74,12 +65,24 @@ public class Topologies implements Iterable<TopologyDetails> { this(src.topologies); } + private static Map<String, TopologyDetails> mkMap(TopologyDetails[] details) { + Map<String, TopologyDetails> ret = new HashMap<>(); + for (TopologyDetails td : details) { + if (ret.put(td.getId(), td) != null) { + throw new IllegalArgumentException( + "Cannot have multiple topologies with the id " + td.getId()); + } + } + return ret; + } + public Collection<String> getAllIds() { return topologies.keySet(); } /** * Get a topology given an ID + * * @param topologyId the id of the topology to get * @return the topology or null if it is not found. */ @@ -88,8 +91,8 @@ public class Topologies implements Iterable<TopologyDetails> { } /** - * Get a topology given a topology name. Nimbus prevents multiple topologies - * from having the same name, so this assumes it is true. + * Get a topology given a topology name. Nimbus prevents multiple topologies from having the same name, so this assumes it is true. + * * @param topologyName the name of the topology to look for * @return the a topology with the given name. */ @@ -109,6 +112,7 @@ public class Topologies implements Iterable<TopologyDetails> { /** * Get all topologies submitted/owned by a given user. + * * @param user the name of the user * @return all of the topologies submitted by this user. */ http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java index 41e7edf..c43f94a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java @@ -43,31 +43,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TopologyDetails { + private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class); private final String topologyId; private final Map<String, Object> topologyConf; private final StormTopology topology; private final Map<ExecutorDetails, String> executorToComponent; private final int numWorkers; + //when topology was launched + private final int launchTime; + private final String owner; + private final String topoName; //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>> private Map<ExecutorDetails, NormalizedResourceRequest> resourceList; //Max heap size for a worker used by topology private Double topologyWorkerMaxHeapSize; //topology priority private Integer topologyPriority; - //when topology was launched - private final int launchTime; - private final String owner; - private final String topoName; - - private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class); public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, String owner) { - this(topologyId, topologyConf, topology, numWorkers, null, 0, owner); + this(topologyId, topologyConf, topology, numWorkers, null, 0, owner); } public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) { - this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0, owner); + this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0, owner); } public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, @@ -168,8 +167,8 @@ public class TopologyDetails { getExecutorToComponent().get(exec), exec, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), - resourceList.get(exec) - ); + resourceList.get(exec) + ); addDefaultResforExec(exec); } } @@ -194,9 +193,8 @@ public class TopologyDetails { } /** - * Returns a representation of the non-system components of the topology graph Each Component - * object in the returning map is populated with the list of its parents, children and execs - * assigned to that component. + * Returns a representation of the non-system components of the topology graph Each Component object in the returning map is populated + * with the list of its parents, children and execs assigned to that component. * * @return a map of components */ @@ -248,6 +246,7 @@ public class TopologyDetails { /** * Gets the on heap memory requirement for a certain task within a topology. + * * @param exec the executor the inquiry is concerning. * @return Double the amount of on heap memory requirement for this exec in topology topoId. */ @@ -255,13 +254,15 @@ public class TopologyDetails { Double ret = null; if (hasExecInTopo(exec)) { ret = resourceList - .get(exec).getOnHeapMemoryMb();; + .get(exec).getOnHeapMemoryMb(); + ; } return ret; } /** * Gets the off heap memory requirement for a certain task within a topology. + * * @param exec the executor the inquiry is concerning. * @return Double the amount of off heap memory requirement for this exec in topology topoId. */ @@ -269,32 +270,33 @@ public class TopologyDetails { Double ret = null; if (hasExecInTopo(exec)) { ret = resourceList - .get(exec).getOffHeapMemoryMb(); + .get(exec).getOffHeapMemoryMb(); } return ret; } /** * Gets the total memory requirement for a task. + * * @param exec the executor the inquiry is concerning. * @return Double the total memory requirement for this exec in topology topoId. */ public Double getTotalMemReqTask(ExecutorDetails exec) { if (hasExecInTopo(exec)) { return getOffHeapMemoryRequirement(exec) - + getOnHeapMemoryRequirement(exec); + + getOnHeapMemoryRequirement(exec); } return null; } /** * Gets the total memory resource list for a set of tasks that is part of a topology. + * * @param executors all executors for a topology - * @return Map<ExecutorDetails, Double> , - * a map of the total memory requirement for all tasks in topology topoId. + * @return Map<ExecutorDetails , Double> , a map of the total memory requirement for all tasks in topology topoId. */ public Set<SharedMemory> getSharedMemoryRequests( - Collection<ExecutorDetails> executors + Collection<ExecutorDetails> executors ) { Set<String> components = new HashSet<>(); for (ExecutorDetails exec : executors) { @@ -324,6 +326,7 @@ public class TopologyDetails { /** * Get the total resource requirement for an executor. + * * @param exec the executor to get the resources for. * @return Double the total about of cpu requirement for executor */ @@ -336,20 +339,20 @@ public class TopologyDetails { /** * Get the total CPU requirement for executor. + * * @param exec - * @return Map<String, Double> generic resource mapping requirement for the executor + * @return Map<String , Double> generic resource mapping requirement for the executor */ public Double getTotalCpuReqTask(ExecutorDetails exec) { if (hasExecInTopo(exec)) { return resourceList - .get(exec).getTotalCpu(); + .get(exec).getTotalCpu(); } return null; } /** - * Note: The public API relevant to resource aware scheduling is unstable as of May 2015. - * We reserve the right to change them. + * Note: The public API relevant to resource aware scheduling is unstable as of May 2015. We reserve the right to change them. * * @return the total on-heap memory requested for this topology */ @@ -379,8 +382,7 @@ public class TopologyDetails { } /** - * Note: The public API relevant to resource aware scheduling is unstable as of May 2015. - * We reserve the right to change them. + * Note: The public API relevant to resource aware scheduling is unstable as of May 2015. We reserve the right to change them. * * @return the total off-heap memory requested for this topology */ @@ -410,8 +412,7 @@ public class TopologyDetails { } /** - * Note: The public API relevant to resource aware scheduling is unstable as of May 2015. - * We reserve the right to change them. + * Note: The public API relevant to resource aware scheduling is unstable as of May 2015. We reserve the right to change them. * * @return the total cpu requested for this topology */ @@ -428,6 +429,7 @@ public class TopologyDetails { /** * get the resources requirements for a executor. + * * @param exec * @return a map containing the resource requirements for this exec */ @@ -440,6 +442,7 @@ public class TopologyDetails { /** * Checks if a executor is part of this topology. + * * @return Boolean whether or not a certain ExecutorDetail is included in the resourceList. */ public boolean hasExecInTopo(ExecutorDetails exec) { @@ -472,28 +475,30 @@ public class TopologyDetails { if (component.equals(Acker.ACKER_COMPONENT_ID)) { if (topologyConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)) { resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, - ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB))); + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB))); } if (topologyConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)) { resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, - ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB))); + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB))); } if (topologyConf.containsKey(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)) { resourceListForExec.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, - ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT))); + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT))); } } else if (component.startsWith(Constants.METRICS_COMPONENT_ID_PREFIX)) { if (topologyConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB)) { resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, - ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB))); + ObjectReader + .getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB))); } if (topologyConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB)) { resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, - ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB))); + ObjectReader + .getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB))); } if (topologyConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)) { resourceListForExec.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, - ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT))); + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT))); } } } @@ -514,6 +519,7 @@ public class TopologyDetails { /** * Get the max heap size for a worker used by this topology. + * * @return the worker max heap size */ public Double getTopologyWorkerMaxHeapSize() { @@ -551,17 +557,17 @@ public class TopologyDetails { @Override public String toString() { return "Name: " - + getName() - + " id: " - + getId() - + " Priority: " - + getTopologyPriority() - + " Uptime: " - + getUpTime() - + " CPU: " - + getTotalRequestedCpu() - + " Memory: " - + (getTotalRequestedMemOffHeap() + getTotalRequestedMemOnHeap()); + + getName() + + " id: " + + getId() + + " Priority: " + + getTopologyPriority() + + " Uptime: " + + getUpTime() + + " CPU: " + + getTotalRequestedCpu() + + " Memory: " + + (getTotalRequestedMemOffHeap() + getTotalRequestedMemOnHeap()); } @Override
