This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a2c5389 Add kylin operator (#9149)
a2c5389 is described below
commit a2c5389a60f68482a60eb40c67b1542d827c187e
Author: yongheng.liu <[email protected]>
AuthorDate: Wed Jul 15 00:25:05 2020 +0800
Add kylin operator (#9149)
Co-authored-by: yongheng.liu <[email protected]>
---
CONTRIBUTING.rst | 16 +-
INSTALL | 16 +-
airflow/providers/apache/kylin/__init__.py | 16 ++
.../apache/kylin/example_dags/__init__.py | 16 ++
.../apache/kylin/example_dags/example_kylin_dag.py | 136 +++++++++++++++++
airflow/providers/apache/kylin/hooks/__init__.py | 16 ++
airflow/providers/apache/kylin/hooks/kylin.py | 77 ++++++++++
.../providers/apache/kylin/operators/__init__.py | 16 ++
.../providers/apache/kylin/operators/kylin_cube.py | 170 +++++++++++++++++++++
airflow/utils/db.py | 11 ++
docs/autoapi_templates/index.rst | 4 +
docs/operators-and-hooks-ref.rst | 6 +
requirements/requirements-python3.6.txt | 3 +-
requirements/requirements-python3.7.txt | 3 +-
requirements/requirements-python3.8.txt | 4 +-
requirements/setup-3.6.md5 | 2 +-
requirements/setup-3.7.md5 | 2 +-
requirements/setup-3.8.md5 | 2 +-
setup.py | 7 +-
tests/providers/apache/kylin/__init__.py | 17 +++
tests/providers/apache/kylin/hooks/__init__.py | 17 +++
tests/providers/apache/kylin/hooks/test_kylin.py | 60 ++++++++
tests/providers/apache/kylin/operators/__init__.py | 17 +++
.../apache/kylin/operators/test_kylin_cube.py | 170 +++++++++++++++++++++
24 files changed, 780 insertions(+), 24 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 22a470c..e310efb 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -314,14 +314,14 @@ This is the full list of those extras:
.. START EXTRAS HERE
all_dbs, amazon, apache.atlas, apache_beam, apache.cassandra, apache.druid,
apache.hdfs,
-apache.hive, apache.pinot, apache.webhdfs, async, atlas, aws, azure,
cassandra, celery, cgroups,
-cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop,
doc, docker, druid,
-elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google,
google_auth, grpc,
-hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap,
microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill,
password, pinot, postgres,
-presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry,
singularity, slack,
-snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm,
yandexcloud, all,
-devel_ci
+apache.hive, apache.kylin, apache.pinot, apache.webhdfs, async, atlas, aws,
azure, cassandra,
+celery, cgroups, cloudant, cncf.kubernetes, dask, databricks, datadog, devel,
devel_hadoop, doc,
+docker, druid, elasticsearch, exasol, facebook, gcp, gcp_api,
github_enterprise, google,
+google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes,
ldap, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, oracle,
pagerduty, papermill, password,
+pinot, postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment,
sendgrid, sentry,
+singularity, slack, snowflake, spark, ssh, statsd, tableau, vertica,
virtualenv, webhdfs, winrm,
+yandexcloud, all, devel_ci
.. END EXTRAS HERE
diff --git a/INSTALL b/INSTALL
index fa4f672..64242c1 100644
--- a/INSTALL
+++ b/INSTALL
@@ -45,14 +45,14 @@ pip install . --constraint
requirements/requirements-python3.7.txt
# START EXTRAS HERE
all_dbs, amazon, apache.atlas, apache_beam, apache.cassandra, apache.druid,
apache.hdfs,
-apache.hive, apache.pinot, apache.webhdfs, async, atlas, aws, azure,
cassandra, celery, cgroups,
-cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop,
doc, docker, druid,
-elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google,
google_auth, grpc,
-hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap,
microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill,
password, pinot, postgres,
-presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry,
singularity, slack,
-snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm,
yandexcloud, all,
-devel_ci
+apache.hive, apache.kylin, apache.pinot, apache.webhdfs, async, atlas, aws,
azure, cassandra,
+celery, cgroups, cloudant, cncf.kubernetes, dask, databricks, datadog, devel,
devel_hadoop, doc,
+docker, druid, elasticsearch, exasol, facebook, gcp, gcp_api,
github_enterprise, google,
+google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes,
ldap, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, oracle,
pagerduty, papermill, password,
+pinot, postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment,
sendgrid, sentry,
+singularity, slack, snowflake, spark, ssh, statsd, tableau, vertica,
virtualenv, webhdfs, winrm,
+yandexcloud, all, devel_ci
# END EXTRAS HERE
diff --git a/airflow/providers/apache/kylin/__init__.py
b/airflow/providers/apache/kylin/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/example_dags/__init__.py
b/airflow/providers/apache/kylin/example_dags/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/example_dags/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
new file mode 100644
index 0000000..eb5aa92
--- /dev/null
+++ b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example DAG which uses the KylinCubeOperator.
+The tasks below include kylin build, refresh, merge operation.
+"""
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.apache.kylin.operators.kylin_cube import
KylinCubeOperator
+from airflow.utils.dates import days_ago
+
+args = {
+ 'owner': 'airflow',
+ 'start_date': days_ago(1),
+}
+
+dag = DAG(
+ dag_id='example_kylin_operator',
+ default_args=args,
+ schedule_interval=None,
+ tags=['example']
+)
+
+
+def gen_build_time(**kwargs):
+ """
+ gen build time and push to xcom
+ :param kwargs:
+ :return:
+ """
+ ti = kwargs['ti']
+ ti.xcom_push(key='date_start', value='1325347200000')
+ ti.xcom_push(key='date_end', value='1325433600000')
+
+
+gen_build_time_task = PythonOperator(
+ python_callable=gen_build_time,
+ task_id='gen_build_time',
+ dag=dag
+)
+
+build_task1 = KylinCubeOperator(
+ task_id="kylin_build_1",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='build',
+ start_time="{{
task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
+ end_time="{{
task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
+ is_track_job=True,
+ dag=dag,
+)
+
+build_task2 = KylinCubeOperator(
+ task_id="kylin_build_2",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='build',
+ start_time='1325433600000',
+ end_time='1325520000000',
+ is_track_job=True,
+ dag=dag,
+)
+
+refresh_task1 = KylinCubeOperator(
+ task_id="kylin_refresh_1",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='refresh',
+ start_time='1325347200000',
+ end_time='1325433600000',
+ is_track_job=True,
+ dag=dag,
+)
+
+merge_task = KylinCubeOperator(
+ task_id="kylin_merge",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='merge',
+ start_time='1325347200000',
+ end_time='1325520000000',
+ is_track_job=True,
+ dag=dag,
+)
+
+disable_task = KylinCubeOperator(
+ task_id="kylin_disable",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='disable',
+ dag=dag,
+)
+
+purge_task = KylinCubeOperator(
+ task_id="kylin_purge",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='purge',
+ dag=dag,
+)
+
+build_task3 = KylinCubeOperator(
+ task_id="kylin_build_3",
+ kylin_conn_id='kylin_default',
+ project='learn_kylin',
+ cube='kylin_sales_cube',
+ command='build',
+ start_time='1325433600000',
+ end_time='1325520000000',
+ dag=dag,
+)
+
+gen_build_time_task >> build_task1 >> build_task2 >> refresh_task1 >>
merge_task
+merge_task >> disable_task >> purge_task >> build_task3
diff --git a/airflow/providers/apache/kylin/hooks/__init__.py
b/airflow/providers/apache/kylin/hooks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/hooks/kylin.py
b/airflow/providers/apache/kylin/hooks/kylin.py
new file mode 100644
index 0000000..59f6ce9
--- /dev/null
+++ b/airflow/providers/apache/kylin/hooks/kylin.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from kylinpy import exceptions, kylinpy
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
+
+class KylinHook(BaseHook):
+ """
+ :param kylin_conn_id: The connection id as configured in Airflow
administration.
+ :type kylin_conn_id: str
+ :param project: porject name
+ :type project: Optional[str]
+ :param dsn: dsn
+ :type dsn: Optional[str]
+ """
+ def __init__(self,
+ kylin_conn_id: Optional[str] = 'kylin_default',
+ project: Optional[str] = None,
+ dsn: Optional[str] = None
+ ):
+ super().__init__()
+ self.kylin_conn_id = kylin_conn_id
+ self.project = project
+ self.dsn = dsn
+
+ def get_conn(self):
+ conn = self.get_connection(self.kylin_conn_id)
+ if self.dsn:
+ return kylinpy.create_kylin(self.dsn)
+ else:
+ self.project = self.project if self.project else conn.schema
+ return kylinpy.Kylin(conn.host, username=conn.login,
+ password=conn.password, port=conn.port,
+ project=self.project, **conn.extra_dejson)
+
+ def cube_run(self, datasource_name, op, **op_args):
+ """
+ run CubeSource command whitch in CubeSource.support_invoke_command
+ :param datasource_name:
+ :param op: command
+ :param op_args: command args
+ :return: response
+ """
+ cube_source = self.get_conn().get_datasource(datasource_name)
+ try:
+ response = cube_source.invoke_command(op, **op_args)
+ return response
+ except exceptions.KylinError as err:
+ raise AirflowException("Cube operation {} error , Message:
{}".format(op, err))
+
+ def get_job_status(self, job_id):
+ """
+ get job status
+ :param job_id: kylin job id
+ :return: job status
+ """
+ return self.get_conn().get_job(job_id).status
diff --git a/airflow/providers/apache/kylin/operators/__init__.py
b/airflow/providers/apache/kylin/operators/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/operators/kylin_cube.py
b/airflow/providers/apache/kylin/operators/kylin_cube.py
new file mode 100644
index 0000000..cec20af
--- /dev/null
+++ b/airflow/providers/apache/kylin/operators/kylin_cube.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from datetime import datetime
+from typing import Optional
+
+from kylinpy import kylinpy
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kylin.hooks.kylin import KylinHook
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class KylinCubeOperator(BaseOperator):
+ """
+ This operator is used to submit request about kylin build/refresh/merge,
+ and can track job status . so users can easier to build kylin job
+
+ For more detail information in
+ `Apache Kylin <http://kylin.apache.org/>`_
+
+ :param kylin_conn_id: The connection id as configured in Airflow
administration.
+ :type kylin_conn_id: str
+ :param project: kylin porject name, this param will overwrite the project
in kylin_conn_id:
+ :type project: str
+ :param cube: kylin cube name
+ :type cube: str
+ :param dsn: (dsn , dsn url of kylin connection ,which will overwrite
kylin_conn_id.
+ for example:
kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1)
+ :type dsn: str
+ :param command: (kylin command include 'build', 'merge', 'refresh',
'delete',
+ 'build_streaming', 'merge_streaming', 'refresh_streaming', 'disable',
'enable',
+ 'purge', 'clone', 'drop'.
+ build - use /kylin/api/cubes/{cubeName}/build rest api,and buildType
is ‘BUILD’,
+ and you should give start_time and end_time
+ refresh - use build rest api,and buildType is ‘REFRESH’
+ merge - use build rest api,and buildType is ‘MERGE’
+ build_streaming - use /kylin/api/cubes/{cubeName}/build2 rest api,and
buildType is ‘BUILD’
+ and you should give offset_start and offset_end
+ refresh_streaming - use build2 rest api,and buildType is ‘REFRESH’
+ merge_streaming - use build2 rest api,and buildType is ‘MERGE’
+ delete - delete segment, and you shoulf give segment_name value
+ disable - disable cube
+ enable - enable cube
+ purge - purge cube
+ clone - clone cube,new cube name is {cube_name}_clone
+ drop - drop cube)
+ :type command: str
+ :param start_time: build segment start time
+ :type start_time: Optional[str]
+ :param end_time: build segment end time
+ :type end_time: Optional[str]
+ :param offset_start: streaming build segment start time
+ :type offset_start: Optional[str]
+ :param offset_end: streaming build segment end time
+ :type offset_end: Optional[str]
+ :param segment_name: segment name
+ :type segment_name: str
+ :param is_track_job: (whether to track job status. if value is True,will
track job until
+ job status is in("FINISHED", "ERROR", "DISCARDED", "KILLED",
"SUICIDAL",
+ "STOPPED") or timeout)
+ :type is_track_job: bool
+ :param interval: track job status,default value is 60s
+ :type interval: int
+ :param timeout: timeout value,default value is 1 day,60 * 60 * 24 s
+ :type timeout: int
+ :param eager_error_status: (jobs error status,if job status in this list
,this task will be error.
+ default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL",
"STOPPED"]))
+ :type eager_error_status: tuple
+ """
+
+ template_fields = ('project', 'cube', 'dsn', 'command', 'start_time',
'end_time',
+ 'segment_name', 'offset_start', 'offset_end')
+ ui_color = '#E79C46'
+ build_command = {'fullbuild', 'build', 'merge', 'refresh',
'build_streaming',
+ 'merge_streaming', 'refresh_streaming'}
+ jobs_end_status = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL",
"STOPPED"}
+
+ # pylint: disable=too-many-arguments,inconsistent-return-statements
+ @apply_defaults
+ def __init__(self,
+ kylin_conn_id: Optional[str] = 'kylin_default',
+ project: Optional[str] = None,
+ cube: Optional[str] = None,
+ dsn: Optional[str] = None,
+ command: Optional[str] = None,
+ start_time: Optional[str] = None,
+ end_time: Optional[str] = None,
+ offset_start: Optional[str] = None,
+ offset_end: Optional[str] = None,
+ segment_name: Optional[str] = None,
+ is_track_job: Optional[bool] = False,
+ interval: int = 60,
+ timeout: int = 60 * 60 * 24,
+ eager_error_status=tuple(["ERROR", "DISCARDED", "KILLED",
"SUICIDAL", "STOPPED"]),
+ *args,
+ **kwargs):
+ super().__init__(*args, **kwargs)
+ self.kylin_conn_id = kylin_conn_id
+ self.project = project
+ self.cube = cube
+ self.dsn = dsn
+ self.command = command
+ self.start_time = start_time
+ self.end_time = end_time
+ self.segment_name = segment_name
+ self.offset_start = offset_start
+ self.offset_end = offset_end
+ self.is_track_job = is_track_job
+ self.interval = interval
+ self.timeout = timeout
+ self.eager_error_status = eager_error_status
+ self.jobs_error_status = [stat.upper() for stat in eager_error_status]
+
+ def execute(self, context):
+
+ _hook = KylinHook(kylin_conn_id=self.kylin_conn_id,
project=self.project, dsn=self.dsn)
+
+ _support_invoke_command = kylinpy.CubeSource.support_invoke_command
+ if self.command.lower() not in _support_invoke_command:
+ raise AirflowException('Kylin:Command {} can not match kylin
command list {}'.format(
+ self.command, _support_invoke_command))
+
+ kylinpy_params = {
+ 'start': datetime.fromtimestamp(int(self.start_time) / 1000) if
self.start_time else None,
+ 'end': datetime.fromtimestamp(int(self.end_time) / 1000) if
self.end_time else None,
+ 'name': self.segment_name,
+ 'offset_start': int(self.offset_start) if self.offset_start else
None,
+ 'offset_end': int(self.offset_end) if self.offset_end else None
+ }
+ rsp_data = _hook.cube_run(self.cube, self.command.lower(),
**kylinpy_params)
+ if self.is_track_job and self.command.lower() in self.build_command:
+ started_at = timezone.utcnow()
+ job_id = rsp_data.get("uuid")
+ if job_id is None:
+ raise AirflowException("kylin job id is None")
+ self.log.info("kylin job id: %s", job_id)
+
+ job_status = None
+ while job_status not in self.jobs_end_status:
+ if (timezone.utcnow() - started_at).total_seconds() >
self.timeout:
+ raise AirflowException('kylin job {}
timeout'.format(job_id))
+ time.sleep(self.interval)
+
+ job_status = _hook.get_job_status(job_id)
+ self.log.info('Kylin job status is %s ', job_status)
+ if job_status in self.jobs_error_status:
+ raise AirflowException(
+ 'Kylin job {} status {} is error '.format(job_id,
job_status))
+
+ if self.do_xcom_push:
+ return rsp_data
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index fb91b11..0a652cd 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -309,6 +309,17 @@ def create_default_connections(session=None):
)
merge_conn(
Connection(
+ conn_id='kylin_default',
+ conn_type='kylin',
+ host='localhost',
+ port=7070,
+ login="ADMIN",
+ password="KYLIN"
+ ),
+ session
+ )
+ merge_conn(
+ Connection(
conn_id="livy_default",
conn_type="livy",
host="livy",
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index 0b9438f..a04c53e 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -86,6 +86,8 @@ All operators are in the following packages:
airflow/providers/apache/hive/transfers/index
+ airflow/providers/apache/kylin/operators/index
+
airflow/providers/apache/livy/operators/index
airflow/providers/apache/livy/sensors/index
@@ -245,6 +247,8 @@ All hooks are in the following packages:
airflow/providers/apache/hive/hooks/index
+ airflow/providers/apache/kylin/hooks/index
+
airflow/providers/apache/livy/hooks/index
airflow/providers/apache/pig/hooks/index
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 869ba57..43b0239 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -144,6 +144,12 @@ Foundation.
:mod:`airflow.providers.apache.hive.sensors.hive_partition`,
:mod:`airflow.providers.apache.hive.sensors.metastore_partition`
+ * - `Apache Kylin <https://kylin.apache.org/>`__
+ -
+ - :mod:`airflow.providers.apache.kylin.hooks.kylin`
+ - :mod:`airflow.providers.apache.kylin.operators.kylin_cube`
+ -
+
* - `Apache Livy <https://livy.apache.org/>`__
-
- :mod:`airflow.providers.apache.livy.hooks.livy`
diff --git a/requirements/requirements-python3.6.txt
b/requirements/requirements-python3.6.txt
index 4f318a2..792e908 100644
--- a/requirements/requirements-python3.6.txt
+++ b/requirements/requirements-python3.6.txt
@@ -205,10 +205,11 @@ jsonpickle==1.4.1
jsonpointer==2.0
jsonschema==3.2.0
junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
jupyter-core==4.6.3
kombu==4.6.11
kubernetes==11.0.0
+kylinpy==2.8.1
lazy-object-proxy==1.5.0
ldap3==2.7
lockfile==0.12.2
diff --git a/requirements/requirements-python3.7.txt
b/requirements/requirements-python3.7.txt
index 5268016..a055ace 100644
--- a/requirements/requirements-python3.7.txt
+++ b/requirements/requirements-python3.7.txt
@@ -201,10 +201,11 @@ jsonpickle==1.4.1
jsonpointer==2.0
jsonschema==3.2.0
junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
jupyter-core==4.6.3
kombu==4.6.11
kubernetes==11.0.0
+kylinpy==2.8.1
lazy-object-proxy==1.5.0
ldap3==2.7
lockfile==0.12.2
diff --git a/requirements/requirements-python3.8.txt
b/requirements/requirements-python3.8.txt
index a4b2860..d199958 100644
--- a/requirements/requirements-python3.8.txt
+++ b/requirements/requirements-python3.8.txt
@@ -181,7 +181,6 @@ humanize==2.5.0
hvac==0.10.4
identify==1.4.23
idna==2.10
-ijson==2.6.1
imagesize==1.2.0
importlib-metadata==1.7.0
inflection==0.5.0
@@ -202,10 +201,11 @@ jsonpickle==1.4.1
jsonpointer==2.0
jsonschema==3.2.0
junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
jupyter-core==4.6.3
kombu==4.6.11
kubernetes==11.0.0
+kylinpy==2.8.1
lazy-object-proxy==1.5.0
ldap3==2.7
lockfile==0.12.2
diff --git a/requirements/setup-3.6.md5 b/requirements/setup-3.6.md5
index 83d65ec..3956d1f 100644
--- a/requirements/setup-3.6.md5
+++ b/requirements/setup-3.6.md5
@@ -1 +1 @@
-5e35d346a28f77758fc2a78f121d7257 /opt/airflow/setup.py
+4b8e4aec9535614784887c046bed986b /opt/airflow/setup.py
diff --git a/requirements/setup-3.7.md5 b/requirements/setup-3.7.md5
index 83d65ec..3956d1f 100644
--- a/requirements/setup-3.7.md5
+++ b/requirements/setup-3.7.md5
@@ -1 +1 @@
-5e35d346a28f77758fc2a78f121d7257 /opt/airflow/setup.py
+4b8e4aec9535614784887c046bed986b /opt/airflow/setup.py
diff --git a/requirements/setup-3.8.md5 b/requirements/setup-3.8.md5
index 83d65ec..3956d1f 100644
--- a/requirements/setup-3.8.md5
+++ b/requirements/setup-3.8.md5
@@ -1 +1 @@
-5e35d346a28f77758fc2a78f121d7257 /opt/airflow/setup.py
+4b8e4aec9535614784887c046bed986b /opt/airflow/setup.py
diff --git a/setup.py b/setup.py
index b319bd3..c70519c 100644
--- a/setup.py
+++ b/setup.py
@@ -320,6 +320,9 @@ kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0',
]
+kylin = [
+ 'kylinpy>=2.6'
+]
ldap = [
'ldap3>=2.5.1',
]
@@ -491,6 +494,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, Iterable[str]] = {
"apache.druid": druid,
"apache.hdfs": hdfs,
"apache.hive": hive,
+ "apache.kylin": kylin,
"apache.livy": [],
"apache.pig": [],
"apache.pinot": pinot,
@@ -555,6 +559,7 @@ EXTRAS_REQUIREMENTS: Dict[str, Iterable[str]] = {
"apache.druid": druid,
"apache.hdfs": hdfs,
"apache.hive": hive,
+ "apache.kylin": kylin,
"apache.pinot": pinot,
"apache.webhdfs": webhdfs,
'async': async_packages,
@@ -589,7 +594,7 @@ EXTRAS_REQUIREMENTS: Dict[str, Iterable[str]] = {
'jdbc': jdbc,
'jira': jira,
'kerberos': kerberos,
- 'kubernetes': kubernetes, # TODO: remove this in Airflow 2.1
+ 'kubernetes': kubernetes, # TODO: remove this in Airflow 2.1
'ldap': ldap,
"microsoft.azure": azure,
"microsoft.mssql": mssql,
diff --git a/tests/providers/apache/kylin/__init__.py
b/tests/providers/apache/kylin/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/kylin/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kylin/hooks/__init__.py
b/tests/providers/apache/kylin/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/kylin/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kylin/hooks/test_kylin.py
b/tests/providers/apache/kylin/hooks/test_kylin.py
new file mode 100644
index 0000000..5d7b1c3
--- /dev/null
+++ b/tests/providers/apache/kylin/hooks/test_kylin.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest.mock import MagicMock, patch
+
+from kylinpy.exceptions import KylinCubeError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.apache.kylin.hooks.kylin import KylinHook
+
+
+class TestKylinHook(unittest.TestCase):
+
+ def setUp(self) -> None:
+ self.hook = KylinHook(kylin_conn_id='kylin_default',
project='learn_kylin')
+
+ @patch("kylinpy.Kylin.get_job")
+ def test_get_job_status(self, mock_job):
+ job = MagicMock()
+ job.status = "ERROR"
+ mock_job.return_value = job
+ self.assertEqual(self.hook.get_job_status('123'), "ERROR")
+
+ @patch("kylinpy.Kylin.get_datasource")
+ def test_cube_run(self, cube_source):
+
+ class MockCubeSource:
+ def invoke_command(self, command, **kwargs):
+ invoke_command_list = ['fullbuild', 'build', 'merge',
'refresh',
+ 'delete', 'build_streaming',
'merge_streaming', 'refresh_streaming',
+ 'disable', 'enable', 'purge', 'clone',
'drop']
+ if command in invoke_command_list:
+ return {"code": "000", "data": {}}
+ else:
+ raise KylinCubeError('Unsupported invoke command for
datasource: {}'.format(command))
+
+ cube_source.return_value = MockCubeSource()
+ response_data = {"code": "000", "data": {}}
+ self.assertDictEqual(self.hook.cube_run('kylin_sales_cube', 'build'),
response_data)
+ self.assertDictEqual(self.hook.cube_run('kylin_sales_cube',
'refresh'), response_data)
+ self.assertDictEqual(self.hook.cube_run('kylin_sales_cube', 'merge'),
response_data)
+ self.assertDictEqual(self.hook.cube_run('kylin_sales_cube',
'build_streaming'), response_data)
+ self.assertRaises(AirflowException, self.hook.cube_run,
'kylin_sales_cube', 'build123',)
diff --git a/tests/providers/apache/kylin/operators/__init__.py
b/tests/providers/apache/kylin/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/kylin/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kylin/operators/test_kylin_cube.py
b/tests/providers/apache/kylin/operators/test_kylin_cube.py
new file mode 100644
index 0000000..f71ec3d
--- /dev/null
+++ b/tests/providers/apache/kylin/operators/test_kylin_cube.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance
+from airflow.models.dag import DAG
+from airflow.providers.apache.kylin.operators.kylin_cube import
KylinCubeOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2020, 1, 1)
+
+
+class TestKylinCubeOperator(unittest.TestCase):
+ _config = {
+ 'kylin_conn_id': 'kylin_default',
+ 'project': 'learn_kylin',
+ 'cube': 'kylin_sales_cube',
+ 'command': 'build',
+ 'start_time': datetime(2012, 1, 2, 0, 0).strftime("%s") + '000',
+ 'end_time': datetime(2012, 1, 3, 0, 0).strftime("%s") + '000',
+
+ }
+ cube_command = ['fullbuild', 'build', 'merge', 'refresh',
+ 'delete', 'build_streaming', 'merge_streaming',
'refresh_streaming',
+ 'disable', 'enable', 'purge', 'clone', 'drop']
+
+ build_response = {"uuid": "c143e0e4-ac5f-434d-acf3-46b0d15e3dc6"}
+
+ def setUp(self):
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+ self.dag = DAG('test_dag_id', default_args=args)
+
+ @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+ def test_execute(self, mock_hook):
+ operator = KylinCubeOperator(
+ task_id='kylin_task',
+ dag=self.dag,
+ **self._config
+ )
+ hook = MagicMock()
+ hook.invoke_command = ['fullbuild', 'build', 'merge', 'refresh',
+ 'delete', 'build_streaming', 'merge_streaming',
'refresh_streaming',
+ 'disable', 'enable', 'purge', 'clone', 'drop']
+ mock_hook.return_value = hook
+ mock_hook.cube_run.return_value = {}
+
+ self.assertIsNotNone(operator)
+ self.assertEqual(self._config['kylin_conn_id'], operator.kylin_conn_id)
+ self.assertEqual(self._config['project'], operator.project)
+ self.assertEqual(self._config['cube'], operator.cube)
+ self.assertEqual(self._config['command'], operator.command)
+ self.assertEqual(self._config['start_time'], operator.start_time)
+ self.assertEqual(self._config['end_time'], operator.end_time)
+ operator.execute(None)
+ mock_hook.assert_called_once_with(
+ kylin_conn_id=self._config['kylin_conn_id'],
+ project=self._config['project'],
+ dsn=None
+ )
+
+
mock_hook.return_value.cube_run.assert_called_once_with('kylin_sales_cube',
+ 'build',
+
end=datetime(2012, 1, 3, 0, 0),
+ name=None,
+
offset_end=None,
+
offset_start=None,
+
start=datetime(2012, 1, 2, 0, 0))
+
+ @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+ def test_execute_build(self, mock_hook):
+ operator = KylinCubeOperator(
+ is_track_job=True,
+ timeout=5,
+ interval=1,
+ task_id='kylin_task',
+ dag=self.dag,
+ **self._config
+ )
+ hook = MagicMock()
+ hook.invoke_command = self.cube_command
+ hook.cube_run.return_value = self.build_response
+
+ hook.get_job_status.side_effect = ["RUNNING", "RUNNING", "FINISHED"]
+ mock_hook.return_value = hook
+ self.assertEqual(operator.execute(None)['uuid'],
"c143e0e4-ac5f-434d-acf3-46b0d15e3dc6")
+
+ @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+ def test_execute_build_status_error(self, mock_hook):
+ operator = KylinCubeOperator(
+ is_track_job=True,
+ timeout=5,
+ interval=1,
+ task_id='kylin_task',
+ dag=self.dag,
+ **self._config
+ )
+ hook = MagicMock()
+ hook.invoke_command = self.cube_command
+ hook.cube_run.return_value = self.build_response
+
+ hook.get_job_status.return_value = "ERROR"
+ mock_hook.return_value = hook
+ self.assertRaises(AirflowException, operator.execute, None)
+
+ @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+ def test_execute_build_time_out_error(self, mock_hook):
+ operator = KylinCubeOperator(
+ is_track_job=True,
+ timeout=5,
+ interval=1,
+ task_id='kylin_task',
+ dag=self.dag,
+ **self._config
+ )
+ hook = MagicMock()
+ hook.invoke_command = self.cube_command
+ hook.cube_run.return_value = self.build_response
+
+ hook.get_job_status.return_value = "RUNNING"
+ mock_hook.return_value = hook
+ self.assertRaises(AirflowException, operator.execute, None)
+
+ def test_render_template(self):
+ operator = KylinCubeOperator(
+ task_id="kylin_build_1",
+ kylin_conn_id='kylin_default',
+ project="{{ params.project }}",
+ cube="{{ params.cube }}",
+ command="{{ params.command }}",
+ start_time="{{ params.start_time }}",
+ end_time="{{ params.end_time }}",
+ is_track_job=True,
+ dag=self.dag,
+ params={
+ 'project': 'learn_kylin',
+ 'cube': 'kylin_sales_cube',
+ 'command': 'build',
+ 'start_time': '1483200000000',
+ 'end_time': '1483286400000',
+ },
+ )
+ ti = TaskInstance(operator, DEFAULT_DATE)
+ ti.render_templates()
+ self.assertEqual('learn_kylin', getattr(operator, 'project'))
+ self.assertEqual('kylin_sales_cube', getattr(operator, 'cube'))
+ self.assertEqual('build', getattr(operator, 'command'))
+ self.assertEqual('1483200000000', getattr(operator, 'start_time'))
+ self.assertEqual('1483286400000', getattr(operator, 'end_time'))