Github user chemikadze commented on a diff in the pull request:

    https://github.com/apache/griffin/pull/468#discussion_r239993349
  
    --- Diff: 
service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
    @@ -0,0 +1,238 @@
    +package org.apache.griffin.core.job;
    +
    +import com.alibaba.fastjson.JSON;
    +import com.google.gson.Gson;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.griffin.core.job.entity.JobInstanceBean;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.core.env.Environment;
    +import org.springframework.http.HttpEntity;
    +import org.springframework.http.HttpHeaders;
    +import org.springframework.http.MediaType;
    +import org.springframework.stereotype.Component;
    +import org.springframework.web.client.RestTemplate;
    +
    +import javax.annotation.PostConstruct;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
    +import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
    +
    +@Component
    +public class LivyTaskSubmitHelper {
    +
    +    private static final Logger logger = LoggerFactory
    +            .getLogger(LivyTaskSubmitHelper.class);
    +
    +    private static final String REQUEST_BY_HEADER = "X-Requested-By";
    +    private JobInstanceBean jobInstance;
    +    private SparkSubmitJob sparkSubmitJob;
    +    private ConcurrentMap<Long, Integer> taskAppIdMap = new 
ConcurrentHashMap<>();
    +    // Current number of tasks
    +    private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
    +    private String workerNamePre;
    +    private RestTemplate restTemplate = new RestTemplate();
    +    // queue for pub or sub
    +    private BlockingQueue<Map<String, Object>> queue;
    +    public static final int DEFAULT_QUEUE_SIZE = 20000;
    +    private String uri;
    +
    +    @Autowired
    +    private Environment env;
    +
    +    @PostConstruct
    +    public void init() {
    +        startWorker();
    +        uri = env.getProperty("livy.uri");
    +        logger.info("Livy uri : {}",uri);
    +    }
    +
    +    public LivyTaskSubmitHelper() {
    +        this.workerNamePre = "livy-task-submit-worker";
    +    }
    +
    +    public void startWorker() {
    +        queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    +        BatchTaskWorker worker = new BatchTaskWorker();
    +        worker.setDaemon(true);
    +        worker.setName(workerNamePre + "-" + worker.getName());
    +        worker.start();
    +    }
    +
    +    public void addTaskToWaitingQueue(Map<String, Object> t) {
    +        if (t == null) {
    +            logger.warn("task is blank, workerNamePre: {}", workerNamePre);
    +            return;
    +        }
    +
    +        if (queue.remainingCapacity() <= 0) {
    +            logger.warn("task is discard, workerNamePre: {}, task: {}", 
workerNamePre, JSON.toJSON(t));
    --- End diff --
    
    What will happen to job instance status after that?


---

Reply via email to