msumit closed pull request #4165: [AIRFLOW-3322] Update qubole_hook to fetch
command args dynamically from qds_sdk
URL: https://github.com/apache/incubator-airflow/pull/4165
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/hooks/qubole_hook.py
b/airflow/contrib/hooks/qubole_hook.py
index 3df77d3a1f..1c98f26afc 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -34,7 +34,6 @@
PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand,
DbExportCommand, \
DbImportCommand
-
COMMAND_CLASSES = {
"hivecmd": HiveCommand,
"prestocmd": PrestoCommand,
@@ -47,35 +46,52 @@
"dbimportcmd": DbImportCommand
}
-HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id']
-
-POSITIONAL_ARGS = ['sub_command', 'parameters']
-
-COMMAND_ARGS = {
- "hivecmd": ['query', 'script_location', 'macros', 'tags', 'sample_size',
- 'cluster_label', 'name'],
- 'prestocmd': ['query', 'script_location', 'macros', 'tags',
'cluster_label', 'name'],
- 'hadoopcmd': ['sub_command', 'tags', 'cluster_label', 'name'],
- 'shellcmd': ['script', 'script_location', 'files', 'archives',
'parameters', 'tags',
- 'cluster_label', 'name'],
- 'pigcmd': ['script', 'script_location', 'parameters', 'tags',
'cluster_label',
- 'name'],
- 'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
- 'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros',
'tags',
- 'cluster_label', 'language', 'app_id', 'name', 'arguments',
'note_id',
- 'user_program_arguments'],
- 'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id',
'db_table',
- 'db_update_mode', 'db_update_keys', 'export_dir',
- 'fields_terminated_by', 'tags', 'name',
'customer_cluster_label',
- 'use_customer_cluster', 'additional_options'],
- 'dbimportcmd': ['mode', 'hive_table', 'dbtap_id', 'db_table',
'where_clause',
- 'parallelism', 'extract_query', 'boundary_query',
'split_column',
- 'tags', 'name', 'hive_serde', 'customer_cluster_label',
- 'use_customer_cluster', 'schema', 'additional_options']
+POSITIONAL_ARGS = {
+ 'hadoopcmd': ['sub_command'],
+ 'shellcmd': ['parameters'],
+ 'pigcmd': ['parameters']
}
-class QuboleHook(BaseHook, LoggingMixin):
+def flatten_list(list_of_lists):
+ return [element for array in list_of_lists for element in array]
+
+
+def filter_options(options):
+ options_to_remove = ["help", "print-logs-live", "print-logs"]
+ return [option for option in options if option not in options_to_remove]
+
+
+def get_options_list(command_class):
+ options_list = [option.get_opt_string().strip("--") for option in
command_class.optparser.option_list]
+ return filter_options(options_list)
+
+
+def build_command_args():
+ command_args, hyphen_args = {}, set()
+ for cmd in COMMAND_CLASSES:
+
+ # get all available options from the class
+ opts_list = get_options_list(COMMAND_CLASSES[cmd])
+
+ # append positional args if any for the command
+ if cmd in POSITIONAL_ARGS:
+ opts_list += POSITIONAL_ARGS[cmd]
+
+ # get args with a hyphen and replace them with underscore
+ for index, opt in enumerate(opts_list):
+ if "-" in opt:
+ opts_list[index] = opt.replace("-", "_")
+ hyphen_args.add(opts_list[index])
+
+ command_args[cmd] = opts_list
+ return command_args, list(hyphen_args)
+
+
+COMMAND_ARGS, HYPHEN_ARGS = build_command_args()
+
+
+class QuboleHook(BaseHook):
def __init__(self, *args, **kwargs):
conn = self.get_connection(kwargs['qubole_conn_id'])
Qubole.configure(api_token=conn.password, api_url=conn.host)
@@ -189,12 +205,13 @@ def create_cmd_args(self, context):
cmd_type = self.kwargs['command_type']
inplace_args = None
tags = set([self.dag_id, self.task_id, context['run_id']])
+ positional_args_list = flatten_list(POSITIONAL_ARGS.values())
for k, v in self.kwargs.items():
if k in COMMAND_ARGS[cmd_type]:
if k in HYPHEN_ARGS:
args.append("--{0}={1}".format(k.replace('_', '-'), v))
- elif k in POSITIONAL_ARGS:
+ elif k in positional_args_list:
inplace_args = v
elif k == 'tags':
if isinstance(v, six.string_types):
diff --git a/airflow/contrib/operators/qubole_operator.py
b/airflow/contrib/operators/qubole_operator.py
index 82ee293b93..7818d961c4 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -43,6 +43,8 @@ class QuboleOperator(BaseOperator):
:script_location: s3 location containing query statement
:sample_size: size of sample in bytes on which to run query
:macros: macro values which were used in query
+ :sample_size: size of sample in bytes on which to run query
+ :hive-version: Specifies the hive version to be used. eg:
0.13,1.2,etc.
prestocmd:
:query: inline query statement
:script_location: s3 location containing query statement
@@ -77,12 +79,14 @@ class QuboleOperator(BaseOperator):
:arguments: spark-submit command line arguments
:user_program_arguments: arguments that the user program takes in
:macros: macro values which were used in query
+ :note_id: Id of the Notebook to run
dbtapquerycmd:
:db_tap_id: data store ID of the target database, in Qubole.
:query: inline query statement
:macros: macro values which were used in query
dbexportcmd:
- :mode: 1 (simple), 2 (advance)
+ :mode: Can be 1 for Hive export or 2 for HDFS/S3 export
+ :schema: Db schema name assumed accordingly by database if not
specified
:hive_table: Name of the hive table
:partition_spec: partition specification for Hive table.
:dbtap_id: data store ID of the target database, in Qubole.
@@ -91,9 +95,15 @@ class QuboleOperator(BaseOperator):
:db_update_keys: columns used to determine the uniqueness of rows
:export_dir: HDFS/S3 location from which data will be exported.
:fields_terminated_by: hex of the char used as column separator in
the dataset
+ :use_customer_cluster: To use cluster to run command
+ :customer_cluster_label: the label of the cluster to run the
command on
+ :additional_options: Additional Sqoop options which are needed
enclose options in
+ double or single quotes e.g. '--map-column-hive
id=int,data=string'
dbimportcmd:
:mode: 1 (simple), 2 (advance)
:hive_table: Name of the hive table
+ :schema: Db schema name assumed accordingly by database if not
specified
+ :hive_serde: Output format of the Hive Table
:dbtap_id: data store ID of the target database, in Qubole.
:db_table: name of the db table
:where_clause: where clause, if any
@@ -102,6 +112,10 @@ class QuboleOperator(BaseOperator):
of the where clause.
:boundary_query: Query to be used get range of row IDs to be
extracted
:split_column: Column used as row ID to split data into ranges
(mode 2)
+ :use_customer_cluster: To use cluster to run command
+ :customer_cluster_label: the label of the cluster to run the
command on
+ :additional_options: Additional Sqoop options which are needed
enclose options in
+ double or single quotes
.. note:: Following fields are template-supported : ``query``,
``script_location``,
``sub_command``, ``script``, ``files``, ``archives``, ``program``,
``cmdline``,
diff --git a/setup.py b/setup.py
index d093d4971f..65208ecf48 100644
--- a/setup.py
+++ b/setup.py
@@ -216,7 +216,7 @@ def write_version(filename=os.path.join(*['airflow',
]
pinot = ['pinotdb>=0.1.1']
postgres = ['psycopg2-binary>=2.7.4']
-qds = ['qds-sdk>=1.9.6']
+qds = ['qds-sdk>=1.10.4']
rabbitmq = ['librabbitmq>=1.6.1']
redis = ['redis>=2.10.5']
s3 = ['boto3>=1.7.0, <1.8.0']
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services