[
https://issues.apache.org/jira/browse/AIRFLOW-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127563#comment-17127563
]
ASF GitHub Bot commented on AIRFLOW-7014:
-----------------------------------------
mik-laj commented on a change in pull request #9149:
URL: https://github.com/apache/airflow/pull/9149#discussion_r436345095
##########
File path: airflow/providers/apache/kylin/operators/kylin.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from typing import Optional
+import time
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.apache.kylin.hooks.kylin import KylinHook
+
+
+class KylinOperator(BaseOperator):
+ """
+ :param kylin_conn_id: The connection id as configured in Airflow
administration.
+ :type kylin_conn_id: str
+ :param project: kylin porject name, this param will overwrite the project
in kylin_conn_id:
+ :type project: str
+ :param cube: kylin cube name
+ :type cube: str
+ :param dsn: dsn , dsn url of kylin connection ,which will overwrite
kylin_conn_id.
+ for example:
kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1
+ :type dsn: str
+ :param command: kylin command include:
+ ['build', 'merge', 'refresh', 'delete',
+ 'build_streaming', 'merge_streaming', 'refresh_streaming',
+ 'disable', 'enable', 'purge', 'clone', 'drop']
+ build: use /kylin/api/cubes/{cubeName}/build rest api,and buildType is
‘BUILD’,
+ and you should give start_time and end_time
+ refresh: use build rest api,and buildType is ‘REFRESH’
+ merge: use build rest api,and buildType is ‘MERGE’
+ build_streaming: use /kylin/api/cubes/{cubeName}/build2 rest api,and
buildType is ‘BUILD’
+ build_streaming: use build2 rest api,and buildType is ‘BUILD’,
+ and you should give offset_start and offset_end
+ build_streaming: use build2 rest api,and buildType is ‘REFRESH’
+ build_streaming: use build2 rest api,and buildType is ‘MERGE’
+ delete: delete segment, and you shoulf give segment_name value
+ disable: disable cube
+ enable: enable cube
+ purge: purge cube
+ clone: clone cube,new cube name is {cube_name}_clone
+ drop: drop cube
+ :type command: str
+ :param start_time: build segment start time
+ :type start_time: Optional[str]
+ :param end_time: build segment end time
+ :type end_time: Optional[str]
+ :param offset_start: streaming build segment start time
+ :type offset_start: Optional[str]
+ :param offset_end: streaming build segment end time
+ :type offset_end: Optional[str]
+ :param segment_name: segment name
+ :type segment_name: str
+ :param is_track_job: whether to track job status. if value is True,will
track job until job's status is in
+ ["FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"]
+ :type is_track_job: bool
+ :param interval: track job status,default value is 60s
+ :type interval: int
+ :param timeout: timeout value,default value is 1 day,60 * 60 * 24 s
+ :type timeout: int
+ :param eager_error_status: jobs error status,if job status in this list
,this task will be error.
+ default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL",
"STOPPED"])
+ :type eager_error_status: tuple
+ """
+ template_fields = ('project', 'cube', 'dsn', 'command', 'start_time',
'end_time',
+ 'segment_name', 'offset_start', 'offset_end')
+ ui_color = '#E79C46'
+
+ @apply_defaults
+ def __init__(self,
+ kylin_conn_id: Optional[str] = 'kylin_default',
+ project: Optional[str] = None,
+ cube: Optional[str] = None,
+ dsn: Optional[str] = None,
+ command: Optional[str] = None,
+ start_time: Optional[str] = None,
+ end_time: Optional[str] = None,
+ offset_start: Optional[str] = None,
+ offset_end: Optional[str] = None,
+ segment_name: Optional[str] = None,
+ is_track_job: Optional[bool] = False,
+ interval: int = 60,
+ timeout: int = 60 * 60 * 24,
+ eager_error_status=tuple(["ERROR", "DISCARDED", "KILLED",
"SUICIDAL", "STOPPED"]),
+ *args,
+ **kwargs):
+ super().__init__(*args, **kwargs)
+ self.kylin_conn_id = kylin_conn_id
+ self.project = project
+ self.cube = cube
+ self.dsn = dsn
+ self.command = command
+ self.start_time = start_time
+ self.end_time = end_time
+ self.segment_name = segment_name
+ self.offset_start = offset_start
+ self.offset_end = offset_end
+ self.is_track_job = is_track_job
+ self.interval = interval
+ self.timeout = timeout
+ self.eager_error_status = eager_error_status
+ self.jobs_error_status = [stat.upper() for stat in eager_error_status]
+ self.hook = None
+ self.cube_command = []
+ self.build_command = {'fullbuild', 'build', 'merge', 'refresh',
+ 'build_streaming', 'merge_streaming',
'refresh_streaming'}
+ self.jobs_end_status = ["FINISHED", "ERROR", "DISCARDED", "KILLED",
"SUICIDAL", "STOPPED"]
+
+ def execute(self, context):
+
+ self.hook = KylinHook(kylin_conn_id=self.kylin_conn_id,
+ project=self.project,
+ dsn=self.dsn)
+
+ self.cube_command = self.hook.invoke_command
+ if self.command.lower() not in self.cube_command:
+ raise AirflowException('Kylin:Command {cmd} can not match kylin
command list {cmds}'.format(
+ cmd=self.command, cmds=self.cube_command))
+
+ kylinpy_params = {
+ 'start': datetime.fromtimestamp(int(self.start_time) / 1000) if
self.start_time else None,
Review comment:
Wouldn't it be simpler if the user could pass a datetime object or ISO
datetime?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Operator for Apache Kylin
> --------------------------
>
> Key: AIRFLOW-7014
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7014
> Project: Apache Airflow
> Issue Type: Improvement
> Components: executors, operators
> Affects Versions: 1.10.0
> Reporter: Shao Feng Shi
> Assignee: liuyongheng
> Priority: Major
>
> Apache Kylin is an analytial data warehouse for big data. Kylin provides a
> set of RESTful API for user to trigger the data loading, and also run SQL
> queries against the OLAP cubes in sub-seconds latency. We developed a
> KylinOperator within Airflow, so that user can easily trigger Kylin with
> other tasks (hive, spark, etc), and plan to contribute into Airflow.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)