This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new a2c5389  Add kylin operator (#9149)
a2c5389 is described below

commit a2c5389a60f68482a60eb40c67b1542d827c187e
Author: yongheng.liu <[email protected]>
AuthorDate: Wed Jul 15 00:25:05 2020 +0800

    Add kylin operator (#9149)
    
    Co-authored-by: yongheng.liu <[email protected]>
---
 CONTRIBUTING.rst                                   |  16 +-
 INSTALL                                            |  16 +-
 airflow/providers/apache/kylin/__init__.py         |  16 ++
 .../apache/kylin/example_dags/__init__.py          |  16 ++
 .../apache/kylin/example_dags/example_kylin_dag.py | 136 +++++++++++++++++
 airflow/providers/apache/kylin/hooks/__init__.py   |  16 ++
 airflow/providers/apache/kylin/hooks/kylin.py      |  77 ++++++++++
 .../providers/apache/kylin/operators/__init__.py   |  16 ++
 .../providers/apache/kylin/operators/kylin_cube.py | 170 +++++++++++++++++++++
 airflow/utils/db.py                                |  11 ++
 docs/autoapi_templates/index.rst                   |   4 +
 docs/operators-and-hooks-ref.rst                   |   6 +
 requirements/requirements-python3.6.txt            |   3 +-
 requirements/requirements-python3.7.txt            |   3 +-
 requirements/requirements-python3.8.txt            |   4 +-
 requirements/setup-3.6.md5                         |   2 +-
 requirements/setup-3.7.md5                         |   2 +-
 requirements/setup-3.8.md5                         |   2 +-
 setup.py                                           |   7 +-
 tests/providers/apache/kylin/__init__.py           |  17 +++
 tests/providers/apache/kylin/hooks/__init__.py     |  17 +++
 tests/providers/apache/kylin/hooks/test_kylin.py   |  60 ++++++++
 tests/providers/apache/kylin/operators/__init__.py |  17 +++
 .../apache/kylin/operators/test_kylin_cube.py      | 170 +++++++++++++++++++++
 24 files changed, 780 insertions(+), 24 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 22a470c..e310efb 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -314,14 +314,14 @@ This is the full list of those extras:
   .. START EXTRAS HERE
 
 all_dbs, amazon, apache.atlas, apache_beam, apache.cassandra, apache.druid, 
apache.hdfs,
-apache.hive, apache.pinot, apache.webhdfs, async, atlas, aws, azure, 
cassandra, celery, cgroups,
-cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop, 
doc, docker, druid,
-elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google, 
google_auth, grpc,
-hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap, 
microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, 
password, pinot, postgres,
-presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, 
singularity, slack,
-snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm, 
yandexcloud, all,
-devel_ci
+apache.hive, apache.kylin, apache.pinot, apache.webhdfs, async, atlas, aws, 
azure, cassandra,
+celery, cgroups, cloudant, cncf.kubernetes, dask, databricks, datadog, devel, 
devel_hadoop, doc,
+docker, druid, elasticsearch, exasol, facebook, gcp, gcp_api, 
github_enterprise, google,
+google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, 
ldap, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, oracle, 
pagerduty, papermill, password,
+pinot, postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment, 
sendgrid, sentry,
+singularity, slack, snowflake, spark, ssh, statsd, tableau, vertica, 
virtualenv, webhdfs, winrm,
+yandexcloud, all, devel_ci
 
   .. END EXTRAS HERE
 
diff --git a/INSTALL b/INSTALL
index fa4f672..64242c1 100644
--- a/INSTALL
+++ b/INSTALL
@@ -45,14 +45,14 @@ pip install . --constraint 
requirements/requirements-python3.7.txt
 # START EXTRAS HERE
 
 all_dbs, amazon, apache.atlas, apache_beam, apache.cassandra, apache.druid, 
apache.hdfs,
-apache.hive, apache.pinot, apache.webhdfs, async, atlas, aws, azure, 
cassandra, celery, cgroups,
-cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop, 
doc, docker, druid,
-elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google, 
google_auth, grpc,
-hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap, 
microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, 
password, pinot, postgres,
-presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, 
singularity, slack,
-snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm, 
yandexcloud, all,
-devel_ci
+apache.hive, apache.kylin, apache.pinot, apache.webhdfs, async, atlas, aws, 
azure, cassandra,
+celery, cgroups, cloudant, cncf.kubernetes, dask, databricks, datadog, devel, 
devel_hadoop, doc,
+docker, druid, elasticsearch, exasol, facebook, gcp, gcp_api, 
github_enterprise, google,
+google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, 
ldap, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, oracle, 
pagerduty, papermill, password,
+pinot, postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment, 
sendgrid, sentry,
+singularity, slack, snowflake, spark, ssh, statsd, tableau, vertica, 
virtualenv, webhdfs, winrm,
+yandexcloud, all, devel_ci
 
 # END EXTRAS HERE
 
diff --git a/airflow/providers/apache/kylin/__init__.py 
b/airflow/providers/apache/kylin/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/example_dags/__init__.py 
b/airflow/providers/apache/kylin/example_dags/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/example_dags/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py 
b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
new file mode 100644
index 0000000..eb5aa92
--- /dev/null
+++ b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example DAG which uses the KylinCubeOperator.
+The tasks below include kylin build, refresh, merge operation.
+"""
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.apache.kylin.operators.kylin_cube import 
KylinCubeOperator
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(1),
+}
+
+dag = DAG(
+    dag_id='example_kylin_operator',
+    default_args=args,
+    schedule_interval=None,
+    tags=['example']
+)
+
+
+def gen_build_time(**kwargs):
+    """
+    gen build time and push to xcom
+    :param kwargs:
+    :return:
+    """
+    ti = kwargs['ti']
+    ti.xcom_push(key='date_start', value='1325347200000')
+    ti.xcom_push(key='date_end', value='1325433600000')
+
+
+gen_build_time_task = PythonOperator(
+    python_callable=gen_build_time,
+    task_id='gen_build_time',
+    dag=dag
+)
+
+build_task1 = KylinCubeOperator(
+    task_id="kylin_build_1",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='build',
+    start_time="{{ 
task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
+    end_time="{{ 
task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
+    is_track_job=True,
+    dag=dag,
+)
+
+build_task2 = KylinCubeOperator(
+    task_id="kylin_build_2",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='build',
+    start_time='1325433600000',
+    end_time='1325520000000',
+    is_track_job=True,
+    dag=dag,
+)
+
+refresh_task1 = KylinCubeOperator(
+    task_id="kylin_refresh_1",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='refresh',
+    start_time='1325347200000',
+    end_time='1325433600000',
+    is_track_job=True,
+    dag=dag,
+)
+
+merge_task = KylinCubeOperator(
+    task_id="kylin_merge",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='merge',
+    start_time='1325347200000',
+    end_time='1325520000000',
+    is_track_job=True,
+    dag=dag,
+)
+
+disable_task = KylinCubeOperator(
+    task_id="kylin_disable",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='disable',
+    dag=dag,
+)
+
+purge_task = KylinCubeOperator(
+    task_id="kylin_purge",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='purge',
+    dag=dag,
+)
+
+build_task3 = KylinCubeOperator(
+    task_id="kylin_build_3",
+    kylin_conn_id='kylin_default',
+    project='learn_kylin',
+    cube='kylin_sales_cube',
+    command='build',
+    start_time='1325433600000',
+    end_time='1325520000000',
+    dag=dag,
+)
+
+gen_build_time_task >> build_task1 >> build_task2 >> refresh_task1 >> 
merge_task
+merge_task >> disable_task >> purge_task >> build_task3
diff --git a/airflow/providers/apache/kylin/hooks/__init__.py 
b/airflow/providers/apache/kylin/hooks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/hooks/kylin.py 
b/airflow/providers/apache/kylin/hooks/kylin.py
new file mode 100644
index 0000000..59f6ce9
--- /dev/null
+++ b/airflow/providers/apache/kylin/hooks/kylin.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from kylinpy import exceptions, kylinpy
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
+
+
+class KylinHook(BaseHook):
+    """
+    :param kylin_conn_id: The connection id as configured in Airflow 
administration.
+    :type kylin_conn_id: str
+    :param project: porject name
+    :type project: Optional[str]
+    :param dsn: dsn
+    :type dsn: Optional[str]
+    """
+    def __init__(self,
+                 kylin_conn_id: Optional[str] = 'kylin_default',
+                 project: Optional[str] = None,
+                 dsn: Optional[str] = None
+                 ):
+        super().__init__()
+        self.kylin_conn_id = kylin_conn_id
+        self.project = project
+        self.dsn = dsn
+
+    def get_conn(self):
+        conn = self.get_connection(self.kylin_conn_id)
+        if self.dsn:
+            return kylinpy.create_kylin(self.dsn)
+        else:
+            self.project = self.project if self.project else conn.schema
+            return kylinpy.Kylin(conn.host, username=conn.login,
+                                 password=conn.password, port=conn.port,
+                                 project=self.project, **conn.extra_dejson)
+
+    def cube_run(self, datasource_name, op, **op_args):
+        """
+        run CubeSource command whitch in CubeSource.support_invoke_command
+        :param datasource_name:
+        :param op: command
+        :param op_args: command args
+        :return: response
+        """
+        cube_source = self.get_conn().get_datasource(datasource_name)
+        try:
+            response = cube_source.invoke_command(op, **op_args)
+            return response
+        except exceptions.KylinError as err:
+            raise AirflowException("Cube operation {} error , Message: 
{}".format(op, err))
+
+    def get_job_status(self, job_id):
+        """
+        get job status
+        :param job_id: kylin job id
+        :return: job status
+        """
+        return self.get_conn().get_job(job_id).status
diff --git a/airflow/providers/apache/kylin/operators/__init__.py 
b/airflow/providers/apache/kylin/operators/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/apache/kylin/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/kylin/operators/kylin_cube.py 
b/airflow/providers/apache/kylin/operators/kylin_cube.py
new file mode 100644
index 0000000..cec20af
--- /dev/null
+++ b/airflow/providers/apache/kylin/operators/kylin_cube.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from datetime import datetime
+from typing import Optional
+
+from kylinpy import kylinpy
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.apache.kylin.hooks.kylin import KylinHook
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class KylinCubeOperator(BaseOperator):
+    """
+    This operator is used to submit request about kylin build/refresh/merge,
+    and can track job status . so users can easier to build kylin job
+
+    For more detail information in
+    `Apache Kylin <http://kylin.apache.org/>`_
+
+    :param kylin_conn_id: The connection id as configured in Airflow 
administration.
+    :type kylin_conn_id: str
+    :param project: kylin porject name, this param will overwrite the project 
in kylin_conn_id:
+    :type project: str
+    :param cube: kylin cube name
+    :type cube: str
+    :param dsn: (dsn , dsn url of kylin connection ,which will overwrite 
kylin_conn_id.
+        for example: 
kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1)
+    :type dsn: str
+    :param command: (kylin command include 'build', 'merge', 'refresh', 
'delete',
+        'build_streaming', 'merge_streaming', 'refresh_streaming', 'disable', 
'enable',
+        'purge', 'clone', 'drop'.
+        build - use /kylin/api/cubes/{cubeName}/build rest api,and buildType 
is ‘BUILD’,
+        and you should give start_time and end_time
+        refresh - use build rest api,and buildType is ‘REFRESH’
+        merge - use build rest api,and buildType is ‘MERGE’
+        build_streaming - use /kylin/api/cubes/{cubeName}/build2 rest api,and 
buildType is ‘BUILD’
+        and you should give offset_start and offset_end
+        refresh_streaming - use build2 rest api,and buildType is ‘REFRESH’
+        merge_streaming - use build2 rest api,and buildType is ‘MERGE’
+        delete - delete segment, and you shoulf give segment_name value
+        disable - disable cube
+        enable - enable cube
+        purge - purge cube
+        clone - clone cube,new cube name is {cube_name}_clone
+        drop - drop cube)
+    :type command: str
+    :param start_time: build segment start time
+    :type start_time: Optional[str]
+    :param end_time: build segment end time
+    :type end_time: Optional[str]
+    :param offset_start: streaming build segment start time
+    :type offset_start: Optional[str]
+    :param offset_end: streaming build segment end time
+    :type offset_end: Optional[str]
+    :param segment_name: segment name
+    :type segment_name: str
+    :param is_track_job: (whether to track job status. if value is True,will 
track job until
+        job status is in("FINISHED", "ERROR", "DISCARDED", "KILLED", 
"SUICIDAL",
+        "STOPPED") or timeout)
+    :type is_track_job: bool
+    :param interval: track job status,default value is 60s
+    :type interval: int
+    :param timeout: timeout value,default value is 1 day,60 * 60 * 24 s
+    :type timeout: int
+    :param eager_error_status: (jobs error status,if job status in this list 
,this task will be error.
+        default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL", 
"STOPPED"]))
+    :type eager_error_status: tuple
+    """
+
+    template_fields = ('project', 'cube', 'dsn', 'command', 'start_time', 
'end_time',
+                       'segment_name', 'offset_start', 'offset_end')
+    ui_color = '#E79C46'
+    build_command = {'fullbuild', 'build', 'merge', 'refresh', 
'build_streaming',
+                     'merge_streaming', 'refresh_streaming'}
+    jobs_end_status = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", 
"STOPPED"}
+
+    # pylint: disable=too-many-arguments,inconsistent-return-statements
+    @apply_defaults
+    def __init__(self,
+                 kylin_conn_id: Optional[str] = 'kylin_default',
+                 project: Optional[str] = None,
+                 cube: Optional[str] = None,
+                 dsn: Optional[str] = None,
+                 command: Optional[str] = None,
+                 start_time: Optional[str] = None,
+                 end_time: Optional[str] = None,
+                 offset_start: Optional[str] = None,
+                 offset_end: Optional[str] = None,
+                 segment_name: Optional[str] = None,
+                 is_track_job: Optional[bool] = False,
+                 interval: int = 60,
+                 timeout: int = 60 * 60 * 24,
+                 eager_error_status=tuple(["ERROR", "DISCARDED", "KILLED", 
"SUICIDAL", "STOPPED"]),
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        self.kylin_conn_id = kylin_conn_id
+        self.project = project
+        self.cube = cube
+        self.dsn = dsn
+        self.command = command
+        self.start_time = start_time
+        self.end_time = end_time
+        self.segment_name = segment_name
+        self.offset_start = offset_start
+        self.offset_end = offset_end
+        self.is_track_job = is_track_job
+        self.interval = interval
+        self.timeout = timeout
+        self.eager_error_status = eager_error_status
+        self.jobs_error_status = [stat.upper() for stat in eager_error_status]
+
+    def execute(self, context):
+
+        _hook = KylinHook(kylin_conn_id=self.kylin_conn_id, 
project=self.project, dsn=self.dsn)
+
+        _support_invoke_command = kylinpy.CubeSource.support_invoke_command
+        if self.command.lower() not in _support_invoke_command:
+            raise AirflowException('Kylin:Command {} can not match kylin 
command list {}'.format(
+                                   self.command, _support_invoke_command))
+
+        kylinpy_params = {
+            'start': datetime.fromtimestamp(int(self.start_time) / 1000) if 
self.start_time else None,
+            'end': datetime.fromtimestamp(int(self.end_time) / 1000) if 
self.end_time else None,
+            'name': self.segment_name,
+            'offset_start': int(self.offset_start) if self.offset_start else 
None,
+            'offset_end': int(self.offset_end) if self.offset_end else None
+        }
+        rsp_data = _hook.cube_run(self.cube, self.command.lower(), 
**kylinpy_params)
+        if self.is_track_job and self.command.lower() in self.build_command:
+            started_at = timezone.utcnow()
+            job_id = rsp_data.get("uuid")
+            if job_id is None:
+                raise AirflowException("kylin job id is None")
+            self.log.info("kylin job id: %s", job_id)
+
+            job_status = None
+            while job_status not in self.jobs_end_status:
+                if (timezone.utcnow() - started_at).total_seconds() > 
self.timeout:
+                    raise AirflowException('kylin job {} 
timeout'.format(job_id))
+                time.sleep(self.interval)
+
+                job_status = _hook.get_job_status(job_id)
+                self.log.info('Kylin job status is %s ', job_status)
+                if job_status in self.jobs_error_status:
+                    raise AirflowException(
+                        'Kylin job {} status {} is error '.format(job_id, 
job_status))
+
+        if self.do_xcom_push:
+            return rsp_data
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index fb91b11..0a652cd 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -309,6 +309,17 @@ def create_default_connections(session=None):
     )
     merge_conn(
         Connection(
+            conn_id='kylin_default',
+            conn_type='kylin',
+            host='localhost',
+            port=7070,
+            login="ADMIN",
+            password="KYLIN"
+        ),
+        session
+    )
+    merge_conn(
+        Connection(
             conn_id="livy_default",
             conn_type="livy",
             host="livy",
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index 0b9438f..a04c53e 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -86,6 +86,8 @@ All operators are in the following packages:
 
   airflow/providers/apache/hive/transfers/index
 
+  airflow/providers/apache/kylin/operators/index
+
   airflow/providers/apache/livy/operators/index
 
   airflow/providers/apache/livy/sensors/index
@@ -245,6 +247,8 @@ All hooks are in the following packages:
 
   airflow/providers/apache/hive/hooks/index
 
+  airflow/providers/apache/kylin/hooks/index
+
   airflow/providers/apache/livy/hooks/index
 
   airflow/providers/apache/pig/hooks/index
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 869ba57..43b0239 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -144,6 +144,12 @@ Foundation.
        :mod:`airflow.providers.apache.hive.sensors.hive_partition`,
        :mod:`airflow.providers.apache.hive.sensors.metastore_partition`
 
+   * - `Apache Kylin <https://kylin.apache.org/>`__
+     -
+     - :mod:`airflow.providers.apache.kylin.hooks.kylin`
+     - :mod:`airflow.providers.apache.kylin.operators.kylin_cube`
+     -
+
    * - `Apache Livy <https://livy.apache.org/>`__
      -
      - :mod:`airflow.providers.apache.livy.hooks.livy`
diff --git a/requirements/requirements-python3.6.txt 
b/requirements/requirements-python3.6.txt
index 4f318a2..792e908 100644
--- a/requirements/requirements-python3.6.txt
+++ b/requirements/requirements-python3.6.txt
@@ -205,10 +205,11 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
 jupyter-core==4.6.3
 kombu==4.6.11
 kubernetes==11.0.0
+kylinpy==2.8.1
 lazy-object-proxy==1.5.0
 ldap3==2.7
 lockfile==0.12.2
diff --git a/requirements/requirements-python3.7.txt 
b/requirements/requirements-python3.7.txt
index 5268016..a055ace 100644
--- a/requirements/requirements-python3.7.txt
+++ b/requirements/requirements-python3.7.txt
@@ -201,10 +201,11 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
 jupyter-core==4.6.3
 kombu==4.6.11
 kubernetes==11.0.0
+kylinpy==2.8.1
 lazy-object-proxy==1.5.0
 ldap3==2.7
 lockfile==0.12.2
diff --git a/requirements/requirements-python3.8.txt 
b/requirements/requirements-python3.8.txt
index a4b2860..d199958 100644
--- a/requirements/requirements-python3.8.txt
+++ b/requirements/requirements-python3.8.txt
@@ -181,7 +181,6 @@ humanize==2.5.0
 hvac==0.10.4
 identify==1.4.23
 idna==2.10
-ijson==2.6.1
 imagesize==1.2.0
 importlib-metadata==1.7.0
 inflection==0.5.0
@@ -202,10 +201,11 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
 jupyter-core==4.6.3
 kombu==4.6.11
 kubernetes==11.0.0
+kylinpy==2.8.1
 lazy-object-proxy==1.5.0
 ldap3==2.7
 lockfile==0.12.2
diff --git a/requirements/setup-3.6.md5 b/requirements/setup-3.6.md5
index 83d65ec..3956d1f 100644
--- a/requirements/setup-3.6.md5
+++ b/requirements/setup-3.6.md5
@@ -1 +1 @@
-5e35d346a28f77758fc2a78f121d7257  /opt/airflow/setup.py
+4b8e4aec9535614784887c046bed986b  /opt/airflow/setup.py
diff --git a/requirements/setup-3.7.md5 b/requirements/setup-3.7.md5
index 83d65ec..3956d1f 100644
--- a/requirements/setup-3.7.md5
+++ b/requirements/setup-3.7.md5
@@ -1 +1 @@
-5e35d346a28f77758fc2a78f121d7257  /opt/airflow/setup.py
+4b8e4aec9535614784887c046bed986b  /opt/airflow/setup.py
diff --git a/requirements/setup-3.8.md5 b/requirements/setup-3.8.md5
index 83d65ec..3956d1f 100644
--- a/requirements/setup-3.8.md5
+++ b/requirements/setup-3.8.md5
@@ -1 +1 @@
-5e35d346a28f77758fc2a78f121d7257  /opt/airflow/setup.py
+4b8e4aec9535614784887c046bed986b  /opt/airflow/setup.py
diff --git a/setup.py b/setup.py
index b319bd3..c70519c 100644
--- a/setup.py
+++ b/setup.py
@@ -320,6 +320,9 @@ kubernetes = [
     'cryptography>=2.0.0',
     'kubernetes>=3.0.0',
 ]
+kylin = [
+    'kylinpy>=2.6'
+]
 ldap = [
     'ldap3>=2.5.1',
 ]
@@ -491,6 +494,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, Iterable[str]] = {
     "apache.druid": druid,
     "apache.hdfs": hdfs,
     "apache.hive": hive,
+    "apache.kylin": kylin,
     "apache.livy": [],
     "apache.pig": [],
     "apache.pinot": pinot,
@@ -555,6 +559,7 @@ EXTRAS_REQUIREMENTS: Dict[str, Iterable[str]] = {
     "apache.druid": druid,
     "apache.hdfs": hdfs,
     "apache.hive": hive,
+    "apache.kylin": kylin,
     "apache.pinot": pinot,
     "apache.webhdfs": webhdfs,
     'async': async_packages,
@@ -589,7 +594,7 @@ EXTRAS_REQUIREMENTS: Dict[str, Iterable[str]] = {
     'jdbc': jdbc,
     'jira': jira,
     'kerberos': kerberos,
-    'kubernetes': kubernetes,  # TODO: remove this in Airflow 2.1
+    'kubernetes': kubernetes,   # TODO: remove this in Airflow 2.1
     'ldap': ldap,
     "microsoft.azure": azure,
     "microsoft.mssql": mssql,
diff --git a/tests/providers/apache/kylin/__init__.py 
b/tests/providers/apache/kylin/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/kylin/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kylin/hooks/__init__.py 
b/tests/providers/apache/kylin/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/kylin/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kylin/hooks/test_kylin.py 
b/tests/providers/apache/kylin/hooks/test_kylin.py
new file mode 100644
index 0000000..5d7b1c3
--- /dev/null
+++ b/tests/providers/apache/kylin/hooks/test_kylin.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest.mock import MagicMock, patch
+
+from kylinpy.exceptions import KylinCubeError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.apache.kylin.hooks.kylin import KylinHook
+
+
+class TestKylinHook(unittest.TestCase):
+
+    def setUp(self) -> None:
+        self.hook = KylinHook(kylin_conn_id='kylin_default', 
project='learn_kylin')
+
+    @patch("kylinpy.Kylin.get_job")
+    def test_get_job_status(self, mock_job):
+        job = MagicMock()
+        job.status = "ERROR"
+        mock_job.return_value = job
+        self.assertEqual(self.hook.get_job_status('123'), "ERROR")
+
+    @patch("kylinpy.Kylin.get_datasource")
+    def test_cube_run(self, cube_source):
+
+        class MockCubeSource:
+            def invoke_command(self, command, **kwargs):
+                invoke_command_list = ['fullbuild', 'build', 'merge', 
'refresh',
+                                       'delete', 'build_streaming', 
'merge_streaming', 'refresh_streaming',
+                                       'disable', 'enable', 'purge', 'clone', 
'drop']
+                if command in invoke_command_list:
+                    return {"code": "000", "data": {}}
+                else:
+                    raise KylinCubeError('Unsupported invoke command for 
datasource: {}'.format(command))
+
+        cube_source.return_value = MockCubeSource()
+        response_data = {"code": "000", "data": {}}
+        self.assertDictEqual(self.hook.cube_run('kylin_sales_cube', 'build'), 
response_data)
+        self.assertDictEqual(self.hook.cube_run('kylin_sales_cube', 
'refresh'), response_data)
+        self.assertDictEqual(self.hook.cube_run('kylin_sales_cube', 'merge'), 
response_data)
+        self.assertDictEqual(self.hook.cube_run('kylin_sales_cube', 
'build_streaming'), response_data)
+        self.assertRaises(AirflowException, self.hook.cube_run, 
'kylin_sales_cube', 'build123',)
diff --git a/tests/providers/apache/kylin/operators/__init__.py 
b/tests/providers/apache/kylin/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/kylin/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/kylin/operators/test_kylin_cube.py 
b/tests/providers/apache/kylin/operators/test_kylin_cube.py
new file mode 100644
index 0000000..f71ec3d
--- /dev/null
+++ b/tests/providers/apache/kylin/operators/test_kylin_cube.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance
+from airflow.models.dag import DAG
+from airflow.providers.apache.kylin.operators.kylin_cube import 
KylinCubeOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2020, 1, 1)
+
+
+class TestKylinCubeOperator(unittest.TestCase):
+    _config = {
+        'kylin_conn_id': 'kylin_default',
+        'project': 'learn_kylin',
+        'cube': 'kylin_sales_cube',
+        'command': 'build',
+        'start_time': datetime(2012, 1, 2, 0, 0).strftime("%s") + '000',
+        'end_time': datetime(2012, 1, 3, 0, 0).strftime("%s") + '000',
+
+    }
+    cube_command = ['fullbuild', 'build', 'merge', 'refresh',
+                    'delete', 'build_streaming', 'merge_streaming', 
'refresh_streaming',
+                    'disable', 'enable', 'purge', 'clone', 'drop']
+
+    build_response = {"uuid": "c143e0e4-ac5f-434d-acf3-46b0d15e3dc6"}
+
+    def setUp(self):
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+    def test_execute(self, mock_hook):
+        operator = KylinCubeOperator(
+            task_id='kylin_task',
+            dag=self.dag,
+            **self._config
+        )
+        hook = MagicMock()
+        hook.invoke_command = ['fullbuild', 'build', 'merge', 'refresh',
+                               'delete', 'build_streaming', 'merge_streaming', 
'refresh_streaming',
+                               'disable', 'enable', 'purge', 'clone', 'drop']
+        mock_hook.return_value = hook
+        mock_hook.cube_run.return_value = {}
+
+        self.assertIsNotNone(operator)
+        self.assertEqual(self._config['kylin_conn_id'], operator.kylin_conn_id)
+        self.assertEqual(self._config['project'], operator.project)
+        self.assertEqual(self._config['cube'], operator.cube)
+        self.assertEqual(self._config['command'], operator.command)
+        self.assertEqual(self._config['start_time'], operator.start_time)
+        self.assertEqual(self._config['end_time'], operator.end_time)
+        operator.execute(None)
+        mock_hook.assert_called_once_with(
+            kylin_conn_id=self._config['kylin_conn_id'],
+            project=self._config['project'],
+            dsn=None
+        )
+
+        
mock_hook.return_value.cube_run.assert_called_once_with('kylin_sales_cube',
+                                                                'build',
+                                                                
end=datetime(2012, 1, 3, 0, 0),
+                                                                name=None,
+                                                                
offset_end=None,
+                                                                
offset_start=None,
+                                                                
start=datetime(2012, 1, 2, 0, 0))
+
+    @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+    def test_execute_build(self, mock_hook):
+        operator = KylinCubeOperator(
+            is_track_job=True,
+            timeout=5,
+            interval=1,
+            task_id='kylin_task',
+            dag=self.dag,
+            **self._config
+        )
+        hook = MagicMock()
+        hook.invoke_command = self.cube_command
+        hook.cube_run.return_value = self.build_response
+
+        hook.get_job_status.side_effect = ["RUNNING", "RUNNING", "FINISHED"]
+        mock_hook.return_value = hook
+        self.assertEqual(operator.execute(None)['uuid'], 
"c143e0e4-ac5f-434d-acf3-46b0d15e3dc6")
+
+    @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+    def test_execute_build_status_error(self, mock_hook):
+        operator = KylinCubeOperator(
+            is_track_job=True,
+            timeout=5,
+            interval=1,
+            task_id='kylin_task',
+            dag=self.dag,
+            **self._config
+        )
+        hook = MagicMock()
+        hook.invoke_command = self.cube_command
+        hook.cube_run.return_value = self.build_response
+
+        hook.get_job_status.return_value = "ERROR"
+        mock_hook.return_value = hook
+        self.assertRaises(AirflowException, operator.execute, None)
+
+    @patch('airflow.providers.apache.kylin.operators.kylin_cube.KylinHook')
+    def test_execute_build_time_out_error(self, mock_hook):
+        operator = KylinCubeOperator(
+            is_track_job=True,
+            timeout=5,
+            interval=1,
+            task_id='kylin_task',
+            dag=self.dag,
+            **self._config
+        )
+        hook = MagicMock()
+        hook.invoke_command = self.cube_command
+        hook.cube_run.return_value = self.build_response
+
+        hook.get_job_status.return_value = "RUNNING"
+        mock_hook.return_value = hook
+        self.assertRaises(AirflowException, operator.execute, None)
+
+    def test_render_template(self):
+        operator = KylinCubeOperator(
+            task_id="kylin_build_1",
+            kylin_conn_id='kylin_default',
+            project="{{ params.project }}",
+            cube="{{ params.cube }}",
+            command="{{ params.command }}",
+            start_time="{{ params.start_time }}",
+            end_time="{{ params.end_time }}",
+            is_track_job=True,
+            dag=self.dag,
+            params={
+                'project': 'learn_kylin',
+                'cube': 'kylin_sales_cube',
+                'command': 'build',
+                'start_time': '1483200000000',
+                'end_time': '1483286400000',
+            },
+        )
+        ti = TaskInstance(operator, DEFAULT_DATE)
+        ti.render_templates()
+        self.assertEqual('learn_kylin', getattr(operator, 'project'))
+        self.assertEqual('kylin_sales_cube', getattr(operator, 'cube'))
+        self.assertEqual('build', getattr(operator, 'command'))
+        self.assertEqual('1483200000000', getattr(operator, 'start_time'))
+        self.assertEqual('1483286400000', getattr(operator, 'end_time'))

Reply via email to