glad to hear that you will implement this feature


Best Regards
---------------
DolphinScheduler(Incubator) PPMC
Lidong Dai 代立冬
dailidon...@gmail.com
---------------


裴龙武 <peilon...@qq.com> 于2020年5月20日周三 下午3:47写道:

> My code is not perfect yet. I will write a detailed design document. Then
> I will realize this feature about our discussion result.
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"wenhemin"<whm_...@163.com&gt;;
> 发送时间:&nbsp;2020年5月18日(星期一) 晚上7:50
> 收件人:&nbsp;"裴龙武"<peilon...@qq.com&gt;;"dev"<dev@dolphinscheduler.apache.org
> &gt;;
>
> 主题:&nbsp;Re: [Feature] Support SSH Task and Support dummy task like airflow
>
>
>
> Thanks for writing detailed documentation. I think this is also a missing
> feature of DS.
> About the extension point:
> 1.Can ssh tasks be merged into shell tasks. Essentially, they all execute
> shell commands.
> 2.About dummy task, DS has the function of disable nodes, I do n’t know if
> this requirement is met.
>
> The script from AirFlow to Dolphin is great.
>
> &gt; 在 2020年5月18日,09:28,裴龙武 <peilon...@qq.com&gt; 写道:
> &gt;
> &gt;
> &gt; OK, 3Q!
> &gt;
> &gt; First, I will ensure that open source can use.
> &gt;
> &gt; Second, I think we must discuss deeply. I write a more detailed
> document. You can check the attachment. I also send the document to
> DaiLidong.
> &gt;
> &gt; Third,&nbsp; I'll give you the error of not using SSH connection pool.
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------ 原始邮件 ------------------
> &gt; 发件人: "wenhemin"<whm_...@163.com&gt;;
> &gt; 发送时间: 2020年5月14日(星期四) 晚上7:26
> &gt; 收件人: "裴龙武"<peilon...@qq.com&gt;;
> &gt; 主题: Re: [Feature] Support SSH Task and Support dummy task like airflow
> &gt;
> &gt; Great!
> &gt; I think, Can ssh tasks be merged into shell tasks,&nbsp; execute
> script locally or remotely, Configure on the front end.
> &gt; About ssh connect pool, I did not find it necessary to use the
> connection pool.
> &gt;
> &gt; BTW, Look at the code to introduce additional jar packages, You also
> need to ensure that open source can use the license of this jar package.
> &gt;
> &gt;&gt; 在 2020年5月14日,16:20,裴龙武 <peilon...@qq.com 
> <mailto:peilon...@qq.com&gt;&gt;
> 写道:
> &gt;&gt;
> &gt;&gt;
> &gt;&gt; 1. The priority between these tasks is also depended on the
> dolphin DAG define. When the front task is not finished, it not execute
> next task.
> &gt;&gt;
> &gt;&gt; 2. I extend ssh task. I also use local params to config ssh host,
> user and password.
> &gt;&gt;
> &gt;&gt; E.g:
> &gt;&gt; public static AbstractTask newTask(TaskExecutionContext
> taskExecutionContext, Logger logger)
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; throws IllegalArgumentException {
> &gt;&gt;&nbsp;&nbsp; Boolean enable =
> JSONUtils.parseObject(taskExecutionContext.getTaskParams()).getBoolean("enable");
> &gt;&gt;&nbsp;&nbsp; if (enable != null &amp;&amp; enable == false ) {
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; return new
> DummyTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp; }
> &gt;&gt;&nbsp;&nbsp; switch
> (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case SHELL:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> ShellTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case PROCEDURE:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> ProcedureTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case SQL:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> SqlTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case MR:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> MapReduceTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case SPARK:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> SparkTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case FLINK:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> FlinkTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case PYTHON:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> PythonTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case HTTP:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> HttpTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case DATAX:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> DataxTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case SQOOP:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> SqoopTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; case SSH:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new
> SSHTask(taskExecutionContext, logger);
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; default:
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; logger.error("unsupport task
> type: {}", taskExecutionContext.getTaskType());
> &gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new
> IllegalArgumentException("not support task type");
> &gt;&gt;&nbsp;&nbsp; }
> &gt;&gt; }
> &gt;&gt; 3. I am not sure that it supports window or not.
> &gt;&gt;
> &gt;&gt;
> &gt;&gt;
> &gt;&gt; ------------------ 原始邮件 ------------------
> &gt;&gt; 发件人: "wenhemin"<whm_...@163.com <mailto:whm_...@163.com&gt;&gt;;
> &gt;&gt; 发送时间: 2020年5月14日(星期四) 下午3:46
> &gt;&gt; 收件人: "裴龙武"<peilon...@qq.com <mailto:peilon...@qq.com&gt;&gt;;
> &gt;&gt; 主题: Re: [Feature] Support SSH Task and Support dummy task like
> airflow
> &gt;&gt;
> &gt;&gt; Sorry, My previous description is not very clear.
> &gt;&gt;
> &gt;&gt; I want to ask some questions:
> &gt;&gt; 1.How to control the priority between ssh tasks? There may be
> some ssh tasks that have been waiting for execution.
> &gt;&gt; 2.I understand what you want to solve is the problem of executing
> remote ssh scripts in batches.
> &gt;&gt;&nbsp;&nbsp; So, not sure how to use this function.
> &gt;&gt; 3.I don't know if this supports windows system.
> &gt;&gt;
> &gt;&gt;&gt; 在 2020年5月13日,20:56,裴龙武 <peilon...@qq.com <mailto:
> peilon...@qq.com&gt;&gt; 写道:
> &gt;&gt;&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; I use spin lock. Here is my code. Of course , it's not
> perfect. I just do a test. To my surprise, it is the result of the
> execution is the same as the AirFlow
> &gt;&gt;&gt;
> &gt;&gt;&gt; 我通过模拟自选锁方式实现,附件中是我的代码,当然,这并不完善。我拿这个做了测试。令我惊喜的是,我得到了和 AirFlow
> 相同的结果。
> &gt;&gt;&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; ------------------ 原始邮件 ------------------
> &gt;&gt;&gt; 发件人: "whm_777"<whm_...@163.com <mailto:whm_...@163.com
> &gt;&gt;;
> &gt;&gt;&gt; 发送时间: 2020年5月13日(星期三) 晚上7:21
> &gt;&gt;&gt; 收件人: "裴龙武"<peilon...@qq.com <mailto:peilon...@qq.com&gt;&gt;;
> &gt;&gt;&gt; 主题: Re: [Feature] Support SSH Task and Support dummy task
> like airflow
> &gt;&gt;&gt;
> &gt;&gt;&gt; You can modify the maximum number of linux ssh connections.
> &gt;&gt;&gt; If use ssh connection pool, How to control the priority of
> ssh?
> &gt;&gt;&gt;
> &gt;&gt;&gt;&gt; 在 2020年5月13日,18:01,裴龙武 <peilon...@qq.com <mailto:
> peilon...@qq.com&gt;&gt; 写道:
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt; First 3Q,
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt; I&nbsp; use more than 100 task node. But SSH connections
> are limited.
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;
> 我是使用了100多个任务节点,但服务器SSH连接是有限制的,超过后,就会报错了。下面是我扩展SSH任务节点后的一张截图,另外这个DAG是我从AirFlow转换过来的。
> &gt;&gt;&gt;&gt; <330be...@f7f80e73.76c5bb5e.jpg&gt;
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt; ------------------ 原始邮件 ------------------
> &gt;&gt;&gt;&gt; 发件人: "whm_777"<whm_...@163.com <mailto:whm_...@163.com
> &gt;&gt;;
> &gt;&gt;&gt;&gt; 发送时间: 2020年5月13日(星期三) 下午5:50
> &gt;&gt;&gt;&gt; 收件人: "裴龙武"<peilon...@qq.com <mailto:peilon...@qq.com
> &gt;&gt;;
> &gt;&gt;&gt;&gt; 主题: Re: [Feature] Support SSH Task and Support dummy task
> like airflow
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt; E.g.
> &gt;&gt;&gt;&gt; rtn_code=`ssh -o ServerAliveInterval=60 -p xxxx
> r...@xxx.xxx.xxx.xxx <mailto:r...@xxx.xxx.xxx.xxx&gt; ‘shell
> command&nbsp; &gt;/dev/null 2&gt;&amp;1; echo $?'`
> &gt;&gt;&gt;&gt; if [ "$rtn_code" -eq 0 ]; then
> &gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; echo "成功"
> &gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit 0
> &gt;&gt;&gt;&gt; else
> &gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; echo "失败"
> &gt;&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; exit 1
> &gt;&gt;&gt;&gt; fi
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt; Batch shell command is not supported.
> &gt;&gt;&gt;&gt; Multiple servers can be split into multiple task nodes.
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; 在 2020年5月13日,17:40,裴龙武 <peilon...@qq.com <mailto:
> peilon...@qq.com&gt;&gt; 写道:
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; Could you give me a example,3Q. 能否给我一个例子,谢谢!
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; By the way, I have more than 100 tasks in one DAG.
> These tasks connect two other server to execute. So SSH tasks must have
> pool to manager. Now I use JSch and realize a simple pool.
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; 顺带说一下,在我的实际场景中,我有100多个 SSH 任务,这些任务连接两台任务服务器进行任务执行。所以
> SSH 任务进行连接时,必须使用连接池进行管理。当前我使用 JSch,并实现了一个简单的连接池。
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; ------------------ 原始邮件 ------------------
> &gt;&gt;&gt;&gt;&gt; 发件人: "wenhemin"<whm_...@163.com <mailto:
> whm_...@163.com&gt;&gt;;
> &gt;&gt;&gt;&gt;&gt; 发送时间: 2020年5月13日(星期三) 下午5:24
> &gt;&gt;&gt;&gt;&gt; 收件人: "dev"<dev@dolphinscheduler.apache.org <mailto:
> dev@dolphinscheduler.apache.org&gt;&gt;;
> &gt;&gt;&gt;&gt;&gt; 主题: Re: [Feature] Support SSH Task and Support dummy
> task like airflow
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; The shell node is supports remote calling, and get
> the remote command result code.
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;&gt; &gt; 在 2020年5月13日,15:16,裴龙武 <peilon...@qq.com
> <mailto:peilon...@qq.com&gt;&gt; 写道:
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Dear ALL:
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Support Linux SSH Task 支持 Linux SSH 任务
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; 场景描述:当前项目中,工作流的任务的目标是执行不同服务器 Shell 脚本,Shell
> 脚本是保存在业务服务器的固定目录。当 Worker 调度执行时,需要通过固定用户登录这些服务器,然后执行 Shell
> 脚本并获取这些任务执行的状态,其中服务器地址、用户名、密码可配置。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; For example, in my project, the workflow's tasks
> want to execute shell scripts where are in different server's different
> directory. When worker execute these shell scripts, it must use the same
> user to login these server. Also, the worker can get the executing state of
> these server. We can config these server 's host,user and password.
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; SSH Task is very useful for most user SSH
> 任务对大多数用户是非常有用的
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; 分布式调度任务所执行的 Shell
> 脚本是处于不同的业务服务器,都有其固定的业务,这些业务服务器不是 Worker,只是需要 Worker
> 调度执行,我们只需要传递不同的参数,让服务器执行任务脚本即可。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; In dolphinscheduler, the most executing tasks
> are in different servers who are not workers. These servers also have their
> different fixed services. We just have to pass different parameters to
> schedule these shell scripts to execute.
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Python has a module to execute ssh script Python
> 有固定的工具包,可执行这些SSH Shell 脚本
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Python 有一个可执行远程服务器SSH Shell脚本的模块,其名字为:paramiko。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Python has a module that can execute SSH Shell
> script. It's paramiko.
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Others 其他内容
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; 我发现之前的改进功能中也有关于这个的描述,不过相对简单。功能更新地址
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; I found this described in previous feature, but
> it was relatively simple.
> &gt;&gt;&gt;&gt;&gt; &gt; Feature URL
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; 另外,我通过 Shell Task
> 方式去执行远程任务会非常不便,下面是我的脚本,不知道是否有更好的方式。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; In addition, it is very inconvenient for me to
> perform remote tasks through Shell Task. Here is my script. I don't know if
> there's a better way.
> &gt;&gt;&gt;&gt;&gt; &gt; sshpass -p 'password' ssh user@host echo 'ssh
> success' echo 'Hello World' -&amp;gt; /home/dolphinscheduler/test/hello.txt
> echo 'end'
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Support dummy task like airflow 支持像 Airflow
> 中的虚拟任务
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; 场景描述:项目中,有已经产品化的 DAG 文件,DAG
> 文件中包括不同的模块,这些模块之间的有些点是相互依赖的,有些不是,在用户购买不同模块时,需要把未购买模块且其他已购模块未依赖的点设置为 Dummy
> Task,这样实际这些任务就不会执行,这样设置的好处是产品统一性和图的完整性,在AirFlow中,这些是通过DummyOperator完成的。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; For example, in my project, it has a productized
> DAG file. The file contains different modules, some of which are
> interdependent and some of which are not. When customers purchase different
> modules, we need to set some tasks as dummy tasks, which some modules are
> not purchased and the purchased module is not dependent. Because of this
> setting, these dummy tasks are actually not executed. The benefits of this
> setup are product unity and diagram integrity. In airflow, these task
> execute by dummy operator.
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; ** Realize 实现方式**
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Dummy Task 本身实现很简单,只是需要与其他任务配合使用,但任务执行方式设置为
> dummy 时,实际的任务不执行,执行 Dummy Task。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; Dummy Task is easy to realize, but it need to
> use with other different tasks. When the task's executed type is set to
> dummy type, the task are executed as a dummy task and the real task is not
> executed.
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt;
> 顺带说一下,因为项目着急测试使用,我Fork了开发版本,实现两种任务类型。在后续的版本中是否能够支持。
> &gt;&gt;&gt;&gt;&gt; &gt;
> &gt;&gt;&gt;&gt;&gt; &gt; By the way,I already realize these two&amp;nbsp;
> features in my fork branch.&amp;nbsp;Whether the follow-up release can be
> supported
> &gt;&gt;&gt;&gt;&gt;
> &gt;&gt;&gt;&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <SSHClient.java&gt;<SSHPool.java&gt;<SSHTask.java&gt;
> &gt;&gt;
> &gt;
> &gt; <项目场景中关于Dolphin的一些扩展点.pdf&gt;

Reply via email to