This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d78abb95b4 [INLONG-9584][Agent] Delete useless code (#9585)
d78abb95b4 is described below

commit d78abb95b46f30307c1d1f46359902c185d553c1
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jan 18 14:35:10 2024 +0800

    [INLONG-9584][Agent] Delete useless code (#9585)
---
 .../org/apache/inlong/agent/core/task/Task.java    |  90 -------
 .../apache/inlong/agent/core/task/TaskManager.java | 285 ---------------------
 .../apache/inlong/agent/core/task/TaskWrapper.java | 216 ----------------
 3 files changed, 591 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
deleted file mode 100755
index ebf67e34b3..0000000000
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.plugin.Channel;
-import org.apache.inlong.agent.plugin.Reader;
-import org.apache.inlong.agent.plugin.Sink;
-
-/**
- * task meta definition which contains reader -> channel -> sink and job 
config information
- */
-public class Task {
-
-    private final String taskId;
-    private final Reader reader;
-    private final Sink sink;
-    private final Channel channel;
-    private final JobProfile jobConf;
-    private volatile boolean isInited = false;
-
-    public Task(String taskId, Reader reader, Sink sink, Channel channel,
-            JobProfile jobConf) {
-        this.reader = reader;
-        this.sink = sink;
-        this.taskId = taskId;
-        this.channel = channel;
-        this.jobConf = jobConf;
-    }
-
-    public boolean isReadFinished() {
-        return reader.isFinished();
-    }
-
-    public String getTaskId() {
-        return taskId;
-    }
-
-    public Reader getReader() {
-        return reader;
-    }
-
-    public Sink getSink() {
-        return sink;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public JobProfile getJobConf() {
-        return jobConf;
-    }
-
-    public String getName() {
-        return reader.getReadSource() + "->" + sink.toString();
-    }
-
-    public void init() {
-        this.channel.init(jobConf);
-        this.sink.init(jobConf);
-        this.reader.init(jobConf);
-        isInited = true;
-    }
-
-    public boolean isTaskFinishInit() {
-        return isInited;
-    }
-
-    public void destroy() {
-        this.reader.destroy();
-        this.sink.destroy();
-        this.channel.destroy();
-    }
-}
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
deleted file mode 100755
index 344cafba2e..0000000000
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.common.AbstractDaemon;
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.metrics.AgentMetricItem;
-import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.metric.MetricRegister;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
-
-/**
- * Task manager maintains lots of tasks and communicate with job level 
components.
- * It also provide functions to execute commands from job level like 
killing/submit tasks.
- */
-public class TaskManager extends AbstractDaemon {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskManager.class);
-
-    // task thread pool;
-    private final ThreadPoolExecutor runningPool;
-    private final AgentManager agentManager;
-    private final ConcurrentHashMap<String, TaskWrapper> tasks;
-    private final BlockingQueue<TaskWrapper> retryTasks;
-    private final int monitorInterval;
-    private final int taskMaxCapacity;
-    private final int taskRetryMaxTime;
-    private final long waitTime;
-
-    // metrics
-    private final AgentMetricItemSet taskMetrics;
-    private final Map<String, String> dimensions;
-
-    /**
-     * Init task manager.
-     *
-     * @param agentManager agent manager
-     */
-    public TaskManager(AgentManager agentManager) {
-        this.agentManager = agentManager;
-        this.runningPool = new ThreadPoolExecutor(
-                0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<>(),
-                new AgentThreadFactory("task"));
-        // metric for task level
-        this.taskMetrics = new 
AgentMetricItemSet(this.getClass().getSimpleName());
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(KEY_COMPONENT_NAME, 
this.getClass().getSimpleName());
-        MetricRegister.register(taskMetrics);
-
-        tasks = new ConcurrentHashMap<>();
-        AgentConfiguration conf = AgentConfiguration.getAgentConf();
-        retryTasks = new LinkedBlockingQueue<>(
-                conf.getInt(
-                        AgentConstants.TASK_RETRY_MAX_CAPACITY, 
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY));
-        monitorInterval = conf.getInt(
-                AgentConstants.TASK_MONITOR_INTERVAL, 
AgentConstants.DEFAULT_TASK_MONITOR_INTERVAL);
-        taskRetryMaxTime = conf
-                .getInt(AgentConstants.TASK_RETRY_SUBMIT_WAIT_SECONDS,
-                        AgentConstants.DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS);
-        taskMaxCapacity = conf.getInt(
-                AgentConstants.TASK_RETRY_MAX_CAPACITY, 
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY);
-        waitTime = conf.getLong(
-                AgentConstants.THREAD_POOL_AWAIT_TIME, 
AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
-    }
-
-    /**
-     * Get task metrics
-     *
-     * @return task metrics
-     */
-    public AgentMetricItem getTaskMetrics() {
-        return this.taskMetrics.findMetricItem(dimensions);
-    }
-
-    public TaskWrapper getTaskWrapper(String taskId) {
-        return tasks.get(taskId);
-    }
-
-    /**
-     * submit task, wait if task queue is full.
-     *
-     * @param task task
-     */
-    public void submitTask(Task task) {
-        TaskWrapper taskWrapper = new TaskWrapper(this, task);
-        submitTask(taskWrapper);
-    }
-
-    public void submitTask(TaskWrapper wrapper) {
-        TaskWrapper retTaskWrapper = 
tasks.putIfAbsent(wrapper.getTask().getTaskId(), wrapper);
-        if (retTaskWrapper == null) {
-            // pool may be full
-            boolean notSubmitted = true;
-            while (notSubmitted) {
-                try {
-                    if (this.runningPool.isShutdown()) {
-                        LOGGER.error("submit task error because thread pool is 
closed");
-                        break;
-                    }
-                    this.runningPool.submit(wrapper);
-                    notSubmitted = false;
-                } catch (Exception ex) {
-                    AgentUtils.silenceSleepInMs(waitTime);
-                    LOGGER.warn("reject task {}", 
wrapper.getTask().getTaskId(), ex);
-                }
-            }
-            getTaskMetrics().taskRunningCount.incrementAndGet();
-        } else {
-            LOGGER.warn("task cannot be repeated added taskId {}", 
wrapper.getTask().getTaskId());
-        }
-    }
-
-    /**
-     * retry task.
-     *
-     * @param wrapper task wrapper
-     */
-    private boolean addRetryTask(TaskWrapper wrapper) {
-        LOGGER.info("retry submit task {}", wrapper.getTask().getTaskId());
-        try {
-            boolean success = retryTasks.offer(wrapper, taskRetryMaxTime, 
TimeUnit.SECONDS);
-            if (!success) {
-                LOGGER.error("cannot submit to retry queue, max {}, current 
{}", taskMaxCapacity,
-                        retryTasks.size());
-            } else {
-                getTaskMetrics().taskRetryingCount.incrementAndGet();
-            }
-            return success;
-        } catch (Exception ex) {
-            LOGGER.error("error while offer task", ex);
-        }
-        return false;
-    }
-
-    /**
-     * Check whether task is finished
-     *
-     * @param taskId task id
-     * @return true if task is finished otherwise false
-     */
-    public boolean isTaskFinished(String taskId) {
-        TaskWrapper wrapper = tasks.get(taskId);
-        if (wrapper != null) {
-            return wrapper.isFinished();
-        }
-        return false;
-    }
-
-    /**
-     * Check if task is success
-     *
-     * @param taskId task id
-     * @return true if task is success otherwise false
-     */
-    public boolean isTaskSuccess(String taskId) {
-        TaskWrapper wrapper = tasks.get(taskId);
-        if (wrapper != null) {
-            return wrapper.isSuccess();
-        }
-        return false;
-    }
-
-    /**
-     * Remove task and wait task to finish by task id
-     *
-     * @param taskId task id
-     */
-    public void removeTask(String taskId) {
-        if (taskId == null) {
-            return;
-        }
-        getTaskMetrics().taskRunningCount.decrementAndGet();
-        TaskWrapper taskWrapper = tasks.remove(taskId);
-        if (taskWrapper != null) {
-            taskWrapper.destroyTask();
-        }
-    }
-
-    /**
-     * kill task
-     *
-     * @param task task
-     */
-    public boolean killTask(Task task) {
-        // kill running tasks.
-        TaskWrapper taskWrapper = tasks.get(task.getTaskId());
-        if (taskWrapper != null) {
-            taskWrapper.kill();
-            return true;
-        }
-        return false;
-    }
-
-    @VisibleForTesting
-    public int getTaskSize() {
-        return tasks.size();
-    }
-
-    /**
-     * Thread for checking whether task should retry.
-     *
-     * @return runnable thread
-     */
-    public Runnable createTaskMonitorThread() {
-        return () -> {
-            while (isRunnable()) {
-                try {
-                    for (String taskId : tasks.keySet()) {
-                        TaskWrapper wrapper = tasks.get(taskId);
-                        if (wrapper != null && wrapper.isFailed() && 
wrapper.shouldRetry()) {
-                            boolean success = addRetryTask(wrapper);
-                            if (success) {
-                                removeTask(taskId);
-                            }
-                        }
-                    }
-                    while (!retryTasks.isEmpty()) {
-                        TaskWrapper taskWrapper = retryTasks.poll();
-                        if (taskWrapper != null) {
-                            
getTaskMetrics().taskRetryingCount.decrementAndGet();
-                            submitTask(taskWrapper);
-                        }
-                    }
-                    TimeUnit.SECONDS.sleep(monitorInterval);
-                } catch (Throwable ex) {
-                    LOGGER.error("Exception caught", ex);
-                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
-                }
-            }
-        };
-    }
-
-    /**
-     * start service.
-     */
-    @Override
-    public void start() {
-        submitWorker(createTaskMonitorThread());
-    }
-
-    /**
-     * stop service.
-     */
-    @Override
-    public void stop() throws Exception {
-        waitForTerminate();
-        this.runningPool.shutdown();
-    }
-}
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
deleted file mode 100755
index 0f59060944..0000000000
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.JobConstants;
-import org.apache.inlong.agent.message.EndMessage;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.state.AbstractStateWrapper;
-import org.apache.inlong.agent.state.State;
-import org.apache.inlong.agent.utils.AgentUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * TaskWrapper is used in taskManager, it maintains the life cycle of
- * running task.
- */
-public class TaskWrapper extends AbstractStateWrapper {
-
-    public static final int WAIT_FINISH_TIME_OUT = 1;
-    public static final int WAIT_BEGIN_TIME_SECONDS = 60;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskWrapper.class);
-    private final TaskManager taskManager;
-    private final Task task;
-
-    private final AtomicInteger retryTime = new AtomicInteger(0);
-    private final int maxRetryTime;
-    private final int pushMaxWaitTime;
-    private final int pullMaxWaitTime;
-    private ExecutorService executorService;
-
-    public TaskWrapper(TaskManager manager, Task task) {
-        super();
-        this.taskManager = manager;
-        this.task = task;
-        AgentConfiguration conf = AgentConfiguration.getAgentConf();
-        maxRetryTime = conf.getInt(
-                AgentConstants.TASK_MAX_RETRY_TIME, 
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
-        pushMaxWaitTime = conf.getInt(
-                AgentConstants.TASK_PUSH_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
-        pullMaxWaitTime = conf.getInt(
-                AgentConstants.TASK_PULL_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
-        if (executorService == null) {
-            executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-                    60L, TimeUnit.SECONDS,
-                    new SynchronousQueue<Runnable>(),
-                    new AgentThreadFactory("task-reader-writer-task_" + 
task.getTaskId()));
-        }
-        doChangeState(State.ACCEPTED);
-    }
-
-    /**
-     * submit read thread
-     *
-     * @return CompletableFuture
-     */
-    private CompletableFuture<?> submitReadThread() {
-        return CompletableFuture.runAsync(() -> {
-            Message message = null;
-            while (!isException() && !task.isReadFinished()) {
-                // if source deleted,then failed
-                if (!task.getReader().isSourceExist()) {
-                    doChangeState(State.FAILED);
-                } else {
-                    if (message == null
-                            || task.getChannel().push(message, 
pushMaxWaitTime, TimeUnit.SECONDS)) {
-                        message = task.getReader().read();
-                    }
-                }
-            }
-            LOGGER.info("read end, task exception status is {}, read finish 
status is {}", isException(),
-                    task.isReadFinished());
-            // write end message
-            task.getChannel().push(new EndMessage());
-            task.getReader().destroy();
-        }, executorService);
-    }
-
-    /**
-     * submit write thread
-     *
-     * @return CompletableFuture
-     */
-    private CompletableFuture<?> submitWriteThread() {
-        return CompletableFuture.runAsync(() -> {
-            while (!isException()) {
-                Message message = task.getChannel().pull(pullMaxWaitTime, 
TimeUnit.SECONDS);
-                if (message instanceof EndMessage) {
-                    break;
-                }
-                task.getSink().write(message);
-            }
-        }, executorService);
-    }
-
-    /**
-     * submit reader/writer
-     */
-    private void submitThreadsAndWait() {
-        CompletableFuture<?> reader = submitReadThread();
-        CompletableFuture<?> writer = submitWriteThread();
-        CompletableFuture.allOf(reader, writer)
-                .exceptionally(ex -> {
-                    doChangeState(State.FAILED);
-                    LOGGER.error("exception caught", ex);
-                    return null;
-                }).join();
-    }
-
-    /**
-     * kill task
-     */
-    void kill() {
-        LOGGER.info("task id {} is killed", task.getTaskId());
-        doChangeState(State.KILLED);
-    }
-
-    /**
-     * In standalone mode, the job to be removed should wait until the read is 
finished, set
-     * timeout to WAIT_FINISH_TIME_OUT minute to wait for finishing
-     */
-    void waitForFinish() {
-        LOGGER.info("set readTime out to 1 minute task id is {}", 
task.getTaskId());
-        
task.getReader().setReadTimeout(TimeUnit.MINUTES.toMillis(WAIT_FINISH_TIME_OUT));
-    }
-
-    /**
-     * destroy task
-     */
-    void destroyTask() {
-        LOGGER.info("destroy task id is {}", task.getTaskId());
-        task.getReader().finishRead();
-    }
-
-    /**
-     * whether task retry times exceed max retry time.
-     *
-     * @return whether should retry
-     */
-    boolean shouldRetry() {
-        return retryTime.get() < maxRetryTime;
-    }
-
-    Task getTask() {
-        return task;
-    }
-
-    @Override
-    public void addCallbacks() {
-        this.addCallback(State.ACCEPTED, State.RUNNING, (before, after) -> {
-
-        }).addCallback(State.RUNNING, State.FAILED, (before, after) -> {
-            LOGGER.info("task {} is failed, please check it", 
task.getTaskId());
-            retryTime.incrementAndGet();
-            if (!shouldRetry()) {
-                doChangeState(State.FATAL);
-                taskManager.getTaskMetrics().taskFatalCount.incrementAndGet();
-            }
-        }).addCallback(State.FAILED, State.FATAL, (before, after) -> {
-
-        }).addCallback(State.FAILED, State.ACCEPTED, (before, after) -> {
-
-        }).addCallback(State.FAILED, State.RUNNING, ((before, after) -> {
-
-        })).addCallback(State.RUNNING, State.SUCCEEDED, (before, after) -> {
-
-        });
-    }
-
-    @Override
-    public void run() {
-        try {
-            AgentThreadFactory.nameThread(task.getTaskId());
-            LOGGER.info("start to run {}, retry time is {}", task.getTaskId(), 
retryTime.get());
-            AgentUtils.silenceSleepInSeconds(task.getJobConf()
-                    .getLong(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, 
WAIT_BEGIN_TIME_SECONDS));
-            doChangeState(State.RUNNING);
-            task.init();
-            submitThreadsAndWait();
-            if (!isException()) {
-                doChangeState(State.SUCCEEDED);
-            }
-            LOGGER.info("task state is {}, start to destroy task {}", 
getCurrentState(), task.getTaskId());
-            task.destroy();
-        } catch (Exception ex) {
-            LOGGER.error("error while running wrapper", ex);
-            doChangeState(State.FAILED);
-        }
-    }
-}

Reply via email to