Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2433#discussion_r173046065 --- Diff: storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java --- @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.nimbus; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.supervisor.Supervisor; +import org.apache.storm.generated.SupervisorAssignments; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.SupervisorClient; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A service for distributing master assignments to supervisors, this service makes the assignments notification + * asynchronous. + * + * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer. + * + * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, + * let the supervisors sync instead. + * + * <p>Caution: this class is not thread safe. + * + * <pre>{@code + * Working mode + * +--------+ +-----------------+ + * | queue1 | ==> | Working thread1 | + * +--------+ shuffle +--------+ +-----------------+ + * | Master | ==> + * +--------+ +--------+ +-----------------+ + * | queue2 | ==> | Working thread2 | + * +--------+ +-----------------+ + * } + * </pre> + */ +public class AssignmentDistributionService implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class); + private ExecutorService service; + /** + * Flag to indicate if the service is active. + */ + private volatile boolean active = false; + + private Random random; + /** + * Working threads num. + */ + private int threadsNum = 0; + /** + * Working thread queue size. + */ + private int queueSize = 0; + + /** + * Assignments request queue. + */ + private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue; + + /** + * local supervisors for local cluster assignments distribution. + */ + private Map<String, Supervisor> localSupervisors; + + private Map conf; + + private boolean isLocalMode = false; // boolean cache for local mode decision + + /** + * Function for initialization. + * + * @param conf config + */ + public void prepare(Map conf) { + this.conf = conf; + this.random = new Random(47); + + this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10); + this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100); + + this.assignmentsQueue = new HashMap<>(); + for (int i = 0; i < threadsNum; i++) { + this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize)); + } + //start the thread pool + this.service = Executors.newFixedThreadPool(threadsNum); + this.active = true; + //start the threads + for (int i = 0; i < threadsNum; i++) { + this.service.submit(new DistributeTask(this, i)); + } + // for local cluster + localSupervisors = new HashMap<>(); + if (ConfigUtils.isLocalMode(conf)) { + isLocalMode = true; + } + } + + @Override + public void close() throws IOException { + this.active = false; + this.service.shutdownNow(); + try { + this.service.awaitTermination(10L, TimeUnit.SECONDS); --- End diff -- Okey, i will add interruption decision to DistributionTask to let it shut down immediately.
---