Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2433#discussion_r173000061
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
 ---
    @@ -0,0 +1,312 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.nimbus;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.daemon.supervisor.Supervisor;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A service for distributing master assignments to supervisors, this 
service makes the assignments notification
    + * asynchronous.
    + *
    + * <p>We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
    + *
    + * <p>Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request,
    + * let the supervisors sync instead.
    + *
    + * <p>Caution: this class is not thread safe.
    + *
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +    /**
    +     * Flag to indicate if the service is active.
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> 
assignmentsQueue;
    +
    +    /**
    +     * local supervisors for local cluster assignments distribution.
    +     */
    +    private Map<String, Supervisor> localSupervisors;
    +
    +    private Map conf;
    +
    +    private boolean isLocalMode = false; // boolean cache for local mode 
decision
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf config
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 
10);
    +        this.queueSize = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE),
 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new 
LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +        // for local cluster
    +        localSupervisors = new HashMap<>();
    +        if (ConfigUtils.isLocalMode(conf)) {
    +            isLocalMode = true;
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10L, TimeUnit.SECONDS);
    --- End diff --
    
    This always will wait the full 10 seconds because there is no way to signal 
to the DistributeTask that it should shut down.  This is at least in part why 
it is taking so long for the tests to run.


---

Reply via email to