Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r154194548
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - 
state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
    +            }
    +            return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" 
+ statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able 
to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make 
assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> 
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, 
List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || 
Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " 
+ execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot 
workerSlot) {
    --- End diff --
    
    Instead of passing in the comp we may want to pass in the mapping and get 
it ourselves.  Just because I would want to be sure that the state matches 
correctly.


---

Reply via email to