Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993879
  
    --- Diff: 
service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new 
ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", 
workerNamePre, JSON.toJSON(t));
    +            return;
    +        }
    +
    +        queue.add(t);
    +
    +        logger.info("add_task_to_waiting_queue_success, workerNamePre: {}, 
task: {}", workerNamePre, JSON.toJSON(t));
    +    }
    +
    +    /**
    +     * Consumer thread
    +     */
    +    class BatchTaskWorker extends Thread {
    +        public void run() {
    +            long insertTime = System.currentTimeMillis();
    +
    +            // Keep sequential execution within a limited number of tasks
    +            while (true) {
    +                try {
    +                    if (curConcurrentTaskNum.get() < 
getMaxConcurrentTaskCount()
    +                            && (System.currentTimeMillis() - insertTime) 
>= getBatchIntervalSecond() * 1000) {
    +                        Map<String, Object> task = queue.take();
    +                        doTask(task);
    +                        insertTime = System.currentTimeMillis();
    +                    }
    +                } catch (Throwable e) {
    +                    logger.error("Async_worker_doTask_failed, {}", 
workerNamePre + e.getMessage(), e);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void increaseCurTaskNum(Long scheduleId) {
    +        curConcurrentTaskNum.incrementAndGet();
    +        if (scheduleId != null) taskAppIdMap.put(scheduleId, 1);
    +    }
    +
    +    //Remove tasks after job status updates
    +    public void decreaseCurTaskNum(Long scheduleId) {
    +        if (scheduleId != null && taskAppIdMap.containsKey(scheduleId)) {
    +            curConcurrentTaskNum.decrementAndGet();
    +            taskAppIdMap.remove(scheduleId);
    +        }
    +    }
    +
    +    /**
    +     * Submit a task to Livy and concurrent TaskNum++
    +     */
    +    protected void doTask(Map<String, Object> livyConfMap) {
    --- End diff --
    
    Looks like this is duplicating "post2Livy", probably it's better to merge 
it together.


---

Reply via email to