justinwwhuang commented on code in PR #9142:
URL: https://github.com/apache/inlong/pull/9142#discussion_r1374369541


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.instance;
+
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.InstanceDb;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * handle the instance created by task, including add, delete, update etc.
+ * the instance info is store in both db and memory.
+ */
+public class InstanceManager extends AbstractDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceManager.class);
+    private static final int ACTION_QUEUE_CAPACITY = 100000;
+    public static final int CORE_THREAD_SLEEP_TIME = 100;
+    // task in db
+    private final InstanceDb instanceDb;
+    // task in memory
+    private final ConcurrentHashMap<String, Instance> instanceMap;
+    // instance profile queue.
+    private final BlockingQueue<InstanceAction> actionQueue;
+    // task thread pool;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("instance-manager"));
+
+    private final int taskMaxLimit;
+    private final AgentConfiguration agentConf;
+    private final String taskId;
+    private volatile boolean runAtLeastOneTime = false;
+    private volatile boolean running = false;
+
+    /**
+     * Init task manager.
+     */
+    public InstanceManager(String taskId, Db basicDb) {
+        this.taskId = taskId;
+        instanceDb = new InstanceDb(basicDb);
+        this.agentConf = AgentConfiguration.getAgentConf();
+        instanceMap = new ConcurrentHashMap<>();
+        taskMaxLimit = agentConf.getInt(AgentConstants.JOB_NUMBER_LIMIT, 
AgentConstants.DEFAULT_JOB_NUMBER_LIMIT);
+        actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public Instance getInstance(String instanceId) {
+        return instanceMap.get(instanceId);
+    }
+
+    public InstanceProfile getInstanceProfile(String instanceId) {
+        return instanceDb.getInstance(taskId, instanceId);
+    }
+
+    public boolean submitAction(InstanceAction action) {
+        if (action == null) {
+            return false;
+        }
+        return actionQueue.offer(action);
+    }
+
+    /**
+     * thread for core thread.
+     *
+     * @return runnable profile.
+     */
+    private Runnable coreThread() {
+        return () -> {
+            Thread.currentThread().setName("instance-manager-core-" + taskId);
+            running = true;
+            while (isRunnable()) {
+                try {
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    dealWithActionQueue(actionQueue);
+                    keepPaceWithDb();
+                } catch (Throwable ex) {
+                    LOGGER.error("coreThread {}", ex.getMessage());
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
+                }
+                runAtLeastOneTime = true;
+            }
+            running = false;
+        };
+    }
+
+    private void keepPaceWithDb() {
+        traverseDbTasksToMemory();
+        traverseMemoryTasksToDb();
+    }
+
+    private void traverseDbTasksToMemory() {
+        instanceDb.getInstances(taskId).forEach((profileFromDb) -> {
+            InstanceStateEnum dbState = profileFromDb.getState();
+            Instance task = instanceMap.get(profileFromDb.getInstanceId());
+            if (dbState == InstanceStateEnum.DEFAULT) {

Review Comment:
   get



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to