Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2442#discussion_r154194548
--- Diff:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
---
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.Stack;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
+
+ protected static class SolverResult {
+ private final int statesSearched;
+ private final boolean success;
+ private final long timeTakenMillis;
+ private final int backtracked;
+
+ public SolverResult(SearcherState state, boolean success) {
+ this.statesSearched = state.getStatesSearched();
+ this.success = success;
+ timeTakenMillis = Time.currentTimeMillis() -
state.startTimeMillis;
+ backtracked = state.numBacktrack;
+ }
+
+ public SchedulingResult asSchedulingResult() {
+ if (success) {
+ return SchedulingResult.success("Fully Scheduled by
ConstraintSolverStrategy (" + statesSearched
+ + " states traversed in " + timeTakenMillis + "ms,
backtracked " + backtracked + " times)");
+ }
+ return
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+ "Cannot find scheduling that satisfies all constraints ("
+ statesSearched
+ + " states traversed in " + timeTakenMillis + "ms,
backtracked " + backtracked + " times)");
+ }
+ }
+
+ protected static class SearcherState {
+ // Metrics
+ // How many states searched so far.
+ private int statesSearched = 0;
+ // Number of times we had to backtrack.
+ private int numBacktrack = 0;
+ final long startTimeMillis;
+ private final long maxEndTimeMs;
+
+ // Current state
+ // The current executor we are trying to schedule
+ private int execIndex = 0;
+ // A map of the worker to the components in the worker to be able
to enforce constraints.
+ private final Map<WorkerSlot, Set<String>> workerCompAssignment;
+ private final boolean[] okToRemoveFromWorker;
+ // for the currently tested assignment a Map of the node to the
components on it to be able to enforce constraints
+ private final Map<RAS_Node, Set<String>> nodeCompAssignment;
+ private final boolean[] okToRemoveFromNode;
+
+ // Static State
+ // The list of all executors (preferably sorted to make
assignments simpler).
+ private final List<ExecutorDetails> execs;
+ //The maximum number of state to search before stopping.
+ private final int maxStatesSearched;
+ //The topology we are scheduling
+ private final TopologyDetails td;
+
+ private SearcherState(Map<WorkerSlot, Set<String>>
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
+ int maxStatesSearched, long maxTimeMs,
List<ExecutorDetails> execs, TopologyDetails td) {
+ assert !execs.isEmpty();
+ assert execs != null;
+
+ this.workerCompAssignment = workerCompAssignment;
+ this.nodeCompAssignment = nodeCompAssignment;
+ this.maxStatesSearched = maxStatesSearched;
+ this.execs = execs;
+ okToRemoveFromWorker = new boolean[execs.size()];
+ okToRemoveFromNode = new boolean[execs.size()];
+ this.td = td;
+ startTimeMillis = Time.currentTimeMillis();
+ if (maxTimeMs <= 0) {
+ maxEndTimeMs = Long.MAX_VALUE;
+ } else {
+ maxEndTimeMs = startTimeMillis + maxTimeMs;
+ }
+ }
+
+ public void incStatesSearched() {
+ statesSearched++;
+ if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
+ LOG.debug("States Searched: {}", statesSearched);
+ LOG.debug("backtrack: {}", numBacktrack);
+ }
+ }
+
+ public int getStatesSearched() {
+ return statesSearched;
+ }
+
+ public boolean areSearchLimitsExceeded() {
+ return statesSearched > maxStatesSearched ||
Time.currentTimeMillis() > maxEndTimeMs;
+ }
+
+ public SearcherState nextExecutor() {
+ execIndex++;
+ if (execIndex >= execs.size()) {
+ throw new IllegalStateException("Exceeded the exec limit "
+ execIndex + " >= " + execs.size());
+ }
+ return this;
+ }
+
+ public boolean areAllExecsScheduled() {
+ return execIndex == execs.size() - 1;
+ }
+
+ public ExecutorDetails currentExec() {
+ return execs.get(execIndex);
+ }
+
+ public void tryToSchedule(String comp, RAS_Node node, WorkerSlot
workerSlot) {
--- End diff --
Instead of passing in the comp we may want to pass in the mapping and get
it ourselves. Just because I would want to be sure that the state matches
correctly.
---