Github user chemikadze commented on a diff in the pull request:
https://github.com/apache/griffin/pull/468#discussion_r239993856
--- Diff:
service/src/main/java/org/apache/griffin/core/job/LivyTaskSubmitHelper.java ---
@@ -0,0 +1,238 @@
+package org.apache.griffin.core.job;
+
+import com.alibaba.fastjson.JSON;
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
+import static org.apache.griffin.core.util.JsonUtil.toJsonWithFormat;
+
+@Component
+public class LivyTaskSubmitHelper {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(LivyTaskSubmitHelper.class);
+
+ private static final String REQUEST_BY_HEADER = "X-Requested-By";
+ private JobInstanceBean jobInstance;
+ private SparkSubmitJob sparkSubmitJob;
+ private ConcurrentMap<Long, Integer> taskAppIdMap = new
ConcurrentHashMap<>();
+ // Current number of tasks
+ private AtomicInteger curConcurrentTaskNum = new AtomicInteger(0);
+ private String workerNamePre;
+ private RestTemplate restTemplate = new RestTemplate();
+ // queue for pub or sub
+ private BlockingQueue<Map<String, Object>> queue;
+ public static final int DEFAULT_QUEUE_SIZE = 20000;
+ private String uri;
+
+ @Autowired
+ private Environment env;
+
+ @PostConstruct
+ public void init() {
+ startWorker();
+ uri = env.getProperty("livy.uri");
+ logger.info("Livy uri : {}",uri);
+ }
+
+ public LivyTaskSubmitHelper() {
+ this.workerNamePre = "livy-task-submit-worker";
+ }
+
+ public void startWorker() {
+ queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+ BatchTaskWorker worker = new BatchTaskWorker();
+ worker.setDaemon(true);
+ worker.setName(workerNamePre + "-" + worker.getName());
+ worker.start();
+ }
+
+ public void addTaskToWaitingQueue(Map<String, Object> t) {
+ if (t == null) {
+ logger.warn("task is blank, workerNamePre: {}", workerNamePre);
+ return;
+ }
+
+ if (queue.remainingCapacity() <= 0) {
+ logger.warn("task is discard, workerNamePre: {}, task: {}",
workerNamePre, JSON.toJSON(t));
+ return;
+ }
+
+ queue.add(t);
+
+ logger.info("add_task_to_waiting_queue_success, workerNamePre: {},
task: {}", workerNamePre, JSON.toJSON(t));
+ }
+
+ /**
+ * Consumer thread
+ */
+ class BatchTaskWorker extends Thread {
+ public void run() {
+ long insertTime = System.currentTimeMillis();
+
+ // Keep sequential execution within a limited number of tasks
+ while (true) {
+ try {
+ if (curConcurrentTaskNum.get() <
getMaxConcurrentTaskCount()
--- End diff --
Am I understanding right, that if `curConcurrentTaskNum.get() ==
getMaxConcurrentTaskCount()`, or `(System.currentTimeMillis() - insertTime) <
getBatchIntervalSecond() * 1000`, this loop will be spinning without any
pauses? Taking into account, that curConcurrentTaskNum decreases only when
actual livy job ends, it will be taking whole core for long periods of time.
---