Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138662036
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java
 ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.assignments;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * <p>A service for distributing master assignments to supervisors, this 
service makes the assignments notification asynchronous.
    + * <p>We support multiple working threads to distribute assignment, every 
thread has a queue buffer.
    + * <p>Master will shuffle its node request to the queues, if the target 
queue is full, we just discard the request, let the supervisors sync instead.
    + * <p/>
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +
    +    /**
    +     * Flag to indicate if the service is active
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> 
assignmentsQueue;
    +
    +    private Map conf;
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
    +        this.queueSize = 
Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 
100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new 
LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10l, TimeUnit.SECONDS);
    +        } catch (InterruptedException e) {
    +            LOG.error("Failed to close assignments distribute service");
    +        }
    +        this.assignmentsQueue = null;
    +    }
    +
    +    public void addAssignmentsForNode(String node, SupervisorAssignments 
assignments) {
    +        try {
    +            boolean success = 
nextQueue().offer(NodeAssignments.getInstance(node, assignments), 5l, 
TimeUnit.SECONDS);
    +            if (!success) {
    +                LOG.warn("Discard an assignment distribution for node {} 
because the target sub queue is full", node);
    +            }
    +
    +        } catch (InterruptedException e) {
    +            LOG.error("Add node assignments interrupted: {}", 
e.getMessage());
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    static class NodeAssignments {
    +        private String node;
    +        private SupervisorAssignments assignments;
    +
    +        private NodeAssignments(String node, SupervisorAssignments 
assignments) {
    +            this.node = node;
    +            this.assignments = assignments;
    +        }
    +
    +        public static NodeAssignments getInstance(String node, 
SupervisorAssignments assignments) {
    +            return new NodeAssignments(node, assignments);
    +        }
    +
    +        public String getNode() {
    +            return this.node;
    +        }
    +
    +        public SupervisorAssignments getAssignments() {
    +            return this.assignments;
    +        }
    +
    +    }
    +
    +    /**
    +     * Task to distribute assignments.
    +     */
    +    static class DistributeTask implements Runnable {
    +        private AssignmentDistributionService service;
    +        private Integer queueIndex;
    +
    +        DistributeTask(AssignmentDistributionService service, Integer 
index) {
    +            this.service = service;
    +            this.queueIndex = index;
    +        }
    +
    +        @Override
    +        public void run() {
    +            while (true) {
    +                try {
    +                    NodeAssignments nodeAssignments = 
this.service.nextAssignments(queueIndex);
    +                    sendAssignmentsToNode(nodeAssignments);
    +                } catch (InterruptedException e) {
    +                    if (service.isActive()) {
    +                        LOG.error("Get an unexpected interrupt when 
distributing assignments to node, {}", e.getCause());
    +                    } else {
    +                        // service is off now just interrupt it.
    +                        Thread.currentThread().interrupt();
    +                    }
    +                }
    +            }
    +        }
    +
    +        private void sendAssignmentsToNode(NodeAssignments assignments) {
    +            SupervisorClient client = 
SupervisorClient.getConfiguredClient(service.getConf(), assignments.getNode());
    --- End diff --
    
    SupervisorClient should be AutoClosable, so we can use java to ensure it is 
closed.


---

Reply via email to