Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2433#discussion_r173046065
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
 ---
    @@ -0,0 +1,312 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.nimbus;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.daemon.supervisor.Supervisor;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A service for distributing master assignments to supervisors, this 
service makes the assignments notification
    + * asynchronous.
    + *
    + * <p>We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
    + *
    + * <p>Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request,
    + * let the supervisors sync instead.
    + *
    + * <p>Caution: this class is not thread safe.
    + *
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +    /**
    +     * Flag to indicate if the service is active.
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> 
assignmentsQueue;
    +
    +    /**
    +     * local supervisors for local cluster assignments distribution.
    +     */
    +    private Map<String, Supervisor> localSupervisors;
    +
    +    private Map conf;
    +
    +    private boolean isLocalMode = false; // boolean cache for local mode 
decision
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf config
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 
10);
    +        this.queueSize = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE),
 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new 
LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +        // for local cluster
    +        localSupervisors = new HashMap<>();
    +        if (ConfigUtils.isLocalMode(conf)) {
    +            isLocalMode = true;
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10L, TimeUnit.SECONDS);
    --- End diff --
    
    Okey, i will add interruption decision to DistributionTask to let it shut 
down immediately.


---

Reply via email to