This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d78abb95b4 [INLONG-9584][Agent] Delete useless code (#9585)
d78abb95b4 is described below
commit d78abb95b46f30307c1d1f46359902c185d553c1
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jan 18 14:35:10 2024 +0800
[INLONG-9584][Agent] Delete useless code (#9585)
---
.../org/apache/inlong/agent/core/task/Task.java | 90 -------
.../apache/inlong/agent/core/task/TaskManager.java | 285 ---------------------
.../apache/inlong/agent/core/task/TaskWrapper.java | 216 ----------------
3 files changed, 591 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
deleted file mode 100755
index ebf67e34b3..0000000000
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.plugin.Channel;
-import org.apache.inlong.agent.plugin.Reader;
-import org.apache.inlong.agent.plugin.Sink;
-
-/**
- * task meta definition which contains reader -> channel -> sink and job
config information
- */
-public class Task {
-
- private final String taskId;
- private final Reader reader;
- private final Sink sink;
- private final Channel channel;
- private final JobProfile jobConf;
- private volatile boolean isInited = false;
-
- public Task(String taskId, Reader reader, Sink sink, Channel channel,
- JobProfile jobConf) {
- this.reader = reader;
- this.sink = sink;
- this.taskId = taskId;
- this.channel = channel;
- this.jobConf = jobConf;
- }
-
- public boolean isReadFinished() {
- return reader.isFinished();
- }
-
- public String getTaskId() {
- return taskId;
- }
-
- public Reader getReader() {
- return reader;
- }
-
- public Sink getSink() {
- return sink;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public JobProfile getJobConf() {
- return jobConf;
- }
-
- public String getName() {
- return reader.getReadSource() + "->" + sink.toString();
- }
-
- public void init() {
- this.channel.init(jobConf);
- this.sink.init(jobConf);
- this.reader.init(jobConf);
- isInited = true;
- }
-
- public boolean isTaskFinishInit() {
- return isInited;
- }
-
- public void destroy() {
- this.reader.destroy();
- this.sink.destroy();
- this.channel.destroy();
- }
-}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
deleted file mode 100755
index 344cafba2e..0000000000
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.common.AbstractDaemon;
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.metrics.AgentMetricItem;
-import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.metric.MetricRegister;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
-
-/**
- * Task manager maintains lots of tasks and communicate with job level
components.
- * It also provide functions to execute commands from job level like
killing/submit tasks.
- */
-public class TaskManager extends AbstractDaemon {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(TaskManager.class);
-
- // task thread pool;
- private final ThreadPoolExecutor runningPool;
- private final AgentManager agentManager;
- private final ConcurrentHashMap<String, TaskWrapper> tasks;
- private final BlockingQueue<TaskWrapper> retryTasks;
- private final int monitorInterval;
- private final int taskMaxCapacity;
- private final int taskRetryMaxTime;
- private final long waitTime;
-
- // metrics
- private final AgentMetricItemSet taskMetrics;
- private final Map<String, String> dimensions;
-
- /**
- * Init task manager.
- *
- * @param agentManager agent manager
- */
- public TaskManager(AgentManager agentManager) {
- this.agentManager = agentManager;
- this.runningPool = new ThreadPoolExecutor(
- 0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- new AgentThreadFactory("task"));
- // metric for task level
- this.taskMetrics = new
AgentMetricItemSet(this.getClass().getSimpleName());
- this.dimensions = new HashMap<>();
- this.dimensions.put(KEY_COMPONENT_NAME,
this.getClass().getSimpleName());
- MetricRegister.register(taskMetrics);
-
- tasks = new ConcurrentHashMap<>();
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
- retryTasks = new LinkedBlockingQueue<>(
- conf.getInt(
- AgentConstants.TASK_RETRY_MAX_CAPACITY,
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY));
- monitorInterval = conf.getInt(
- AgentConstants.TASK_MONITOR_INTERVAL,
AgentConstants.DEFAULT_TASK_MONITOR_INTERVAL);
- taskRetryMaxTime = conf
- .getInt(AgentConstants.TASK_RETRY_SUBMIT_WAIT_SECONDS,
- AgentConstants.DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS);
- taskMaxCapacity = conf.getInt(
- AgentConstants.TASK_RETRY_MAX_CAPACITY,
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY);
- waitTime = conf.getLong(
- AgentConstants.THREAD_POOL_AWAIT_TIME,
AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
- }
-
- /**
- * Get task metrics
- *
- * @return task metrics
- */
- public AgentMetricItem getTaskMetrics() {
- return this.taskMetrics.findMetricItem(dimensions);
- }
-
- public TaskWrapper getTaskWrapper(String taskId) {
- return tasks.get(taskId);
- }
-
- /**
- * submit task, wait if task queue is full.
- *
- * @param task task
- */
- public void submitTask(Task task) {
- TaskWrapper taskWrapper = new TaskWrapper(this, task);
- submitTask(taskWrapper);
- }
-
- public void submitTask(TaskWrapper wrapper) {
- TaskWrapper retTaskWrapper =
tasks.putIfAbsent(wrapper.getTask().getTaskId(), wrapper);
- if (retTaskWrapper == null) {
- // pool may be full
- boolean notSubmitted = true;
- while (notSubmitted) {
- try {
- if (this.runningPool.isShutdown()) {
- LOGGER.error("submit task error because thread pool is
closed");
- break;
- }
- this.runningPool.submit(wrapper);
- notSubmitted = false;
- } catch (Exception ex) {
- AgentUtils.silenceSleepInMs(waitTime);
- LOGGER.warn("reject task {}",
wrapper.getTask().getTaskId(), ex);
- }
- }
- getTaskMetrics().taskRunningCount.incrementAndGet();
- } else {
- LOGGER.warn("task cannot be repeated added taskId {}",
wrapper.getTask().getTaskId());
- }
- }
-
- /**
- * retry task.
- *
- * @param wrapper task wrapper
- */
- private boolean addRetryTask(TaskWrapper wrapper) {
- LOGGER.info("retry submit task {}", wrapper.getTask().getTaskId());
- try {
- boolean success = retryTasks.offer(wrapper, taskRetryMaxTime,
TimeUnit.SECONDS);
- if (!success) {
- LOGGER.error("cannot submit to retry queue, max {}, current
{}", taskMaxCapacity,
- retryTasks.size());
- } else {
- getTaskMetrics().taskRetryingCount.incrementAndGet();
- }
- return success;
- } catch (Exception ex) {
- LOGGER.error("error while offer task", ex);
- }
- return false;
- }
-
- /**
- * Check whether task is finished
- *
- * @param taskId task id
- * @return true if task is finished otherwise false
- */
- public boolean isTaskFinished(String taskId) {
- TaskWrapper wrapper = tasks.get(taskId);
- if (wrapper != null) {
- return wrapper.isFinished();
- }
- return false;
- }
-
- /**
- * Check if task is success
- *
- * @param taskId task id
- * @return true if task is success otherwise false
- */
- public boolean isTaskSuccess(String taskId) {
- TaskWrapper wrapper = tasks.get(taskId);
- if (wrapper != null) {
- return wrapper.isSuccess();
- }
- return false;
- }
-
- /**
- * Remove task and wait task to finish by task id
- *
- * @param taskId task id
- */
- public void removeTask(String taskId) {
- if (taskId == null) {
- return;
- }
- getTaskMetrics().taskRunningCount.decrementAndGet();
- TaskWrapper taskWrapper = tasks.remove(taskId);
- if (taskWrapper != null) {
- taskWrapper.destroyTask();
- }
- }
-
- /**
- * kill task
- *
- * @param task task
- */
- public boolean killTask(Task task) {
- // kill running tasks.
- TaskWrapper taskWrapper = tasks.get(task.getTaskId());
- if (taskWrapper != null) {
- taskWrapper.kill();
- return true;
- }
- return false;
- }
-
- @VisibleForTesting
- public int getTaskSize() {
- return tasks.size();
- }
-
- /**
- * Thread for checking whether task should retry.
- *
- * @return runnable thread
- */
- public Runnable createTaskMonitorThread() {
- return () -> {
- while (isRunnable()) {
- try {
- for (String taskId : tasks.keySet()) {
- TaskWrapper wrapper = tasks.get(taskId);
- if (wrapper != null && wrapper.isFailed() &&
wrapper.shouldRetry()) {
- boolean success = addRetryTask(wrapper);
- if (success) {
- removeTask(taskId);
- }
- }
- }
- while (!retryTasks.isEmpty()) {
- TaskWrapper taskWrapper = retryTasks.poll();
- if (taskWrapper != null) {
-
getTaskMetrics().taskRetryingCount.decrementAndGet();
- submitTask(taskWrapper);
- }
- }
- TimeUnit.SECONDS.sleep(monitorInterval);
- } catch (Throwable ex) {
- LOGGER.error("Exception caught", ex);
- ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
- }
- }
- };
- }
-
- /**
- * start service.
- */
- @Override
- public void start() {
- submitWorker(createTaskMonitorThread());
- }
-
- /**
- * stop service.
- */
- @Override
- public void stop() throws Exception {
- waitForTerminate();
- this.runningPool.shutdown();
- }
-}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
deleted file mode 100755
index 0f59060944..0000000000
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.JobConstants;
-import org.apache.inlong.agent.message.EndMessage;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.state.AbstractStateWrapper;
-import org.apache.inlong.agent.state.State;
-import org.apache.inlong.agent.utils.AgentUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * TaskWrapper is used in taskManager, it maintains the life cycle of
- * running task.
- */
-public class TaskWrapper extends AbstractStateWrapper {
-
- public static final int WAIT_FINISH_TIME_OUT = 1;
- public static final int WAIT_BEGIN_TIME_SECONDS = 60;
- private static final Logger LOGGER =
LoggerFactory.getLogger(TaskWrapper.class);
- private final TaskManager taskManager;
- private final Task task;
-
- private final AtomicInteger retryTime = new AtomicInteger(0);
- private final int maxRetryTime;
- private final int pushMaxWaitTime;
- private final int pullMaxWaitTime;
- private ExecutorService executorService;
-
- public TaskWrapper(TaskManager manager, Task task) {
- super();
- this.taskManager = manager;
- this.task = task;
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
- maxRetryTime = conf.getInt(
- AgentConstants.TASK_MAX_RETRY_TIME,
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
- pushMaxWaitTime = conf.getInt(
- AgentConstants.TASK_PUSH_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
- pullMaxWaitTime = conf.getInt(
- AgentConstants.TASK_PULL_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
- if (executorService == null) {
- executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new AgentThreadFactory("task-reader-writer-task_" +
task.getTaskId()));
- }
- doChangeState(State.ACCEPTED);
- }
-
- /**
- * submit read thread
- *
- * @return CompletableFuture
- */
- private CompletableFuture<?> submitReadThread() {
- return CompletableFuture.runAsync(() -> {
- Message message = null;
- while (!isException() && !task.isReadFinished()) {
- // if source deleted,then failed
- if (!task.getReader().isSourceExist()) {
- doChangeState(State.FAILED);
- } else {
- if (message == null
- || task.getChannel().push(message,
pushMaxWaitTime, TimeUnit.SECONDS)) {
- message = task.getReader().read();
- }
- }
- }
- LOGGER.info("read end, task exception status is {}, read finish
status is {}", isException(),
- task.isReadFinished());
- // write end message
- task.getChannel().push(new EndMessage());
- task.getReader().destroy();
- }, executorService);
- }
-
- /**
- * submit write thread
- *
- * @return CompletableFuture
- */
- private CompletableFuture<?> submitWriteThread() {
- return CompletableFuture.runAsync(() -> {
- while (!isException()) {
- Message message = task.getChannel().pull(pullMaxWaitTime,
TimeUnit.SECONDS);
- if (message instanceof EndMessage) {
- break;
- }
- task.getSink().write(message);
- }
- }, executorService);
- }
-
- /**
- * submit reader/writer
- */
- private void submitThreadsAndWait() {
- CompletableFuture<?> reader = submitReadThread();
- CompletableFuture<?> writer = submitWriteThread();
- CompletableFuture.allOf(reader, writer)
- .exceptionally(ex -> {
- doChangeState(State.FAILED);
- LOGGER.error("exception caught", ex);
- return null;
- }).join();
- }
-
- /**
- * kill task
- */
- void kill() {
- LOGGER.info("task id {} is killed", task.getTaskId());
- doChangeState(State.KILLED);
- }
-
- /**
- * In standalone mode, the job to be removed should wait until the read is
finished, set
- * timeout to WAIT_FINISH_TIME_OUT minute to wait for finishing
- */
- void waitForFinish() {
- LOGGER.info("set readTime out to 1 minute task id is {}",
task.getTaskId());
-
task.getReader().setReadTimeout(TimeUnit.MINUTES.toMillis(WAIT_FINISH_TIME_OUT));
- }
-
- /**
- * destroy task
- */
- void destroyTask() {
- LOGGER.info("destroy task id is {}", task.getTaskId());
- task.getReader().finishRead();
- }
-
- /**
- * whether task retry times exceed max retry time.
- *
- * @return whether should retry
- */
- boolean shouldRetry() {
- return retryTime.get() < maxRetryTime;
- }
-
- Task getTask() {
- return task;
- }
-
- @Override
- public void addCallbacks() {
- this.addCallback(State.ACCEPTED, State.RUNNING, (before, after) -> {
-
- }).addCallback(State.RUNNING, State.FAILED, (before, after) -> {
- LOGGER.info("task {} is failed, please check it",
task.getTaskId());
- retryTime.incrementAndGet();
- if (!shouldRetry()) {
- doChangeState(State.FATAL);
- taskManager.getTaskMetrics().taskFatalCount.incrementAndGet();
- }
- }).addCallback(State.FAILED, State.FATAL, (before, after) -> {
-
- }).addCallback(State.FAILED, State.ACCEPTED, (before, after) -> {
-
- }).addCallback(State.FAILED, State.RUNNING, ((before, after) -> {
-
- })).addCallback(State.RUNNING, State.SUCCEEDED, (before, after) -> {
-
- });
- }
-
- @Override
- public void run() {
- try {
- AgentThreadFactory.nameThread(task.getTaskId());
- LOGGER.info("start to run {}, retry time is {}", task.getTaskId(),
retryTime.get());
- AgentUtils.silenceSleepInSeconds(task.getJobConf()
- .getLong(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS,
WAIT_BEGIN_TIME_SECONDS));
- doChangeState(State.RUNNING);
- task.init();
- submitThreadsAndWait();
- if (!isException()) {
- doChangeState(State.SUCCEEDED);
- }
- LOGGER.info("task state is {}, start to destroy task {}",
getCurrentState(), task.getTaskId());
- task.destroy();
- } catch (Exception ex) {
- LOGGER.error("error while running wrapper", ex);
- doChangeState(State.FAILED);
- }
- }
-}