Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2433#discussion_r173046065
--- Diff:
storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
---
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.nimbus;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service for distributing master assignments to supervisors, this
service makes the assignments notification
+ * asynchronous.
+ *
+ * <p>We support multiple working threads to distribute assignment, every
thread has a queue buffer.
+ *
+ * <p>Master will shuffle its node request to the queues, if the target
queue is full, we just discard the request,
+ * let the supervisors sync instead.
+ *
+ * <p>Caution: this class is not thread safe.
+ *
+ * <pre>{@code
+ * Working mode
+ * +--------+ +-----------------+
+ * | queue1 | ==> | Working thread1 |
+ * +--------+ shuffle +--------+ +-----------------+
+ * | Master | ==>
+ * +--------+ +--------+ +-----------------+
+ * | queue2 | ==> | Working thread2 |
+ * +--------+ +-----------------+
+ * }
+ * </pre>
+ */
+public class AssignmentDistributionService implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(AssignmentDistributionService.class);
+ private ExecutorService service;
+ /**
+ * Flag to indicate if the service is active.
+ */
+ private volatile boolean active = false;
+
+ private Random random;
+ /**
+ * Working threads num.
+ */
+ private int threadsNum = 0;
+ /**
+ * Working thread queue size.
+ */
+ private int queueSize = 0;
+
+ /**
+ * Assignments request queue.
+ */
+ private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>>
assignmentsQueue;
+
+ /**
+ * local supervisors for local cluster assignments distribution.
+ */
+ private Map<String, Supervisor> localSupervisors;
+
+ private Map conf;
+
+ private boolean isLocalMode = false; // boolean cache for local mode
decision
+
+ /**
+ * Function for initialization.
+ *
+ * @param conf config
+ */
+ public void prepare(Map conf) {
+ this.conf = conf;
+ this.random = new Random(47);
+
+ this.threadsNum =
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS),
10);
+ this.queueSize =
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE),
100);
+
+ this.assignmentsQueue = new HashMap<>();
+ for (int i = 0; i < threadsNum; i++) {
+ this.assignmentsQueue.put(i, new
LinkedBlockingQueue<NodeAssignments>(queueSize));
+ }
+ //start the thread pool
+ this.service = Executors.newFixedThreadPool(threadsNum);
+ this.active = true;
+ //start the threads
+ for (int i = 0; i < threadsNum; i++) {
+ this.service.submit(new DistributeTask(this, i));
+ }
+ // for local cluster
+ localSupervisors = new HashMap<>();
+ if (ConfigUtils.isLocalMode(conf)) {
+ isLocalMode = true;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.active = false;
+ this.service.shutdownNow();
+ try {
+ this.service.awaitTermination(10L, TimeUnit.SECONDS);
--- End diff --
Okey, i will add interruption decision to DistributionTask to let it shut
down immediately.
---