Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993856
  
    --- Diff: 
service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new 
ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", 
workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, 
task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < 
getMaxConcurrentTaskCount()
    --- End diff --
    
    Am I understanding right, that if `curConcurrentTaskNum.get() == 
getMaxConcurrentTaskCount()`, or `(System.currentTimeMillis() - insertTime) < 
getBatchIntervalSecond() * 1000`, this loop will be spinning without any 
pauses? Taking into account, that curConcurrentTaskNum decreases only when 
actual livy job ends, it will be taking whole core for long periods of time.


---

Reply via email to