Github user chemikadze commented on a diff in the pull request:
https://github.com/apache/griffin/pull/468#discussion_r239993349
--- Diff:
service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
@@ -0,0 +1,238 @@
+package org.apache.griffin.core.job;
+
+import com.alibaba.fastjson.JSON;
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
+import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
+
+@Component
+public class LivyTaskSubmitHelper {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(LivyTaskSubmitHelper.class);
+
+ private static final String REQUEST_BY_HEADER = "X-Requested-By";
+ private JobInstanceBean jobInstance;
+ private SparkSubmitJob sparkSubmitJob;
+ private ConcurrentMap<Long, Integer> taskAppIdMap = new
ConcurrentHashMap<>();
+ // Current number of tasks
+ private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
+ private String workerNamePre;
+ private RestTemplate restTemplate = new RestTemplate();
+ // queue for pub or sub
+ private BlockingQueue<Map<String, Object>> queue;
+ public static final int DEFAULT_QUEUE_SIZE = 20000;
+ private String uri;
+
+ @Autowired
+ private Environment env;
+
+ @PostConstruct
+ public void init() {
+ startWorker();
+ uri = env.getProperty("livy.uri");
+ logger.info("Livy uri : {}",uri);
+ }
+
+ public LivyTaskSubmitHelper() {
+ this.workerNamePre = "livy-task-submit-worker";
+ }
+
+ public void startWorker() {
+ queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+ BatchTaskWorker worker = new BatchTaskWorker();
+ worker.setDaemon(true);
+ worker.setName(workerNamePre + "-" + worker.getName());
+ worker.start();
+ }
+
+ public void addTaskToWaitingQueue(Map<String, Object> t) {
+ if (t == null) {
+ logger.warn("task is blank, workerNamePre: {}", workerNamePre);
+ return;
+ }
+
+ if (queue.remainingCapacity() <= 0) {
+ logger.warn("task is discard, workerNamePre: {}, task: {}",
workerNamePre, JSON.toJSON(t));
--- End diff --
What will happen to job instance status after that?
---