Fokko closed pull request #3504: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/3504
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/aws_glue_job_hook.py b/airflow/contrib/aws_glue_job_hook.py new file mode 100644 index 0000000000..323313400a --- /dev/null +++ b/airflow/contrib/aws_glue_job_hook.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import os.path +import time + + +class AwsGlueJobHook(AwsHook): + """ + Interact with AWS Glue - create job, trigger, crawler + + :param job_name: unique job name per AWS account + :type str + :param desc: job description + :type str + :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job + :type int + :param script_location: path to etl script either on s3 or local + :type str + :param conns: A list of connections used by the job + :type list + :param retry_limit: Maximum number of times to retry this job if it fails + :type int + :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job + :type int + :param region_name: aws region name (example: us-east-1) + :type region_name: str + :param s3_bucket: S3 bucket where logs and local etl script will be uploaded + :type str + :param iam_role_name: AWS IAM Role for Glue Job + :type str + """ + + def __init__(self, + job_name=None, + desc=None, + concurrent_run_limit=None, + script_location=None, + conns=None, + retry_limit=None, + num_of_dpus=None, + aws_conn_id='aws_default', + region_name=None, + iam_role_name=None, + s3_bucket=None, *args, **kwargs): + self.job_name = job_name + self.desc = desc + self.concurrent_run_limit = concurrent_run_limit or 1 + self.script_location = script_location + self.conns = conns or ["s3"] + self.retry_limit = retry_limit or 0 + self.num_of_dpus = num_of_dpus or 10 + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.s3_bucket = s3_bucket + self.role_name = iam_role_name + self.S3_PROTOCOL = "s3://" + self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' + self.S3_GLUE_LOGS = 'logs/glue-logs/' + super(AwsGlueJobHook, self).__init__(*args, **kwargs) + + def get_conn(self): + conn = self.get_client_type('glue', self.region_name) + return conn + + def list_jobs(self): + conn = self.get_conn() + return conn.get_jobs() + + def get_iam_execution_role(self): + """ + :return: iam role for job execution + """ + iam_client = self.get_client_type('iam', self.region_name) + + try: + glue_execution_role = iam_client.get_role(RoleName=self.role_name) + self.log.info("Iam Role Name: {}".format(self.role_name)) + return glue_execution_role + except Exception as general_error: + raise AirflowException( + 'Failed to create aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def initialize_job(self, script_arguments=None): + """ + Initializes connection with AWS Glue + to run job + :return: + """ + if self.s3_bucket is None: + raise AirflowException( + 'Could not initialize glue job, ' + 'error: Specify Parameter `s3_bucket`' + ) + + glue_client = self.get_conn() + + try: + job_response = self.get_or_create_glue_job() + job_name = job_response['Name'] + job_run = glue_client.start_job_run( + JobName=job_name, + Arguments=script_arguments + ) + return self.job_completion(job_name, job_run['JobRunId']) + except Exception as general_error: + raise AirflowException( + 'Failed to run aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def job_completion(self, job_name=None, run_id=None): + """ + :param job_name: + :param run_id: + :return: + """ + glue_client = self.get_conn() + job_status = glue_client.get_job_run( + JobName=job_name, + RunId=run_id, + PredecessorsIncluded=True + ) + job_run_state = job_status['JobRun']['JobRunState'] + failed = job_run_state == 'FAILED' + stopped = job_run_state == 'STOPPED' + completed = job_run_state == 'SUCCEEDED' + + while True: + if failed or stopped or completed: + self.log.info("Exiting Job {} Run State: {}" + .format(run_id, job_run_state)) + return {'JobRunState': job_run_state, 'JobRunId': run_id} + else: + self.log.info("Polling for AWS Glue Job {} current run state" + .format(job_name)) + time.sleep(6) + + def get_or_create_glue_job(self): + glue_client = self.get_conn() + try: + self.log.info("Now creating and running AWS Glue Job") + s3_log_path = "s3://{bucket_name}/{logs_path}{job_name}"\ + .format(bucket_name=self.s3_bucket, + logs_path=self.S3_GLUE_LOGS, + job_name=self.job_name) + + execution_role = self.get_iam_execution_role() + script_location = self._check_script_location() + create_job_response = glue_client.create_job( + Name=self.job_name, + Description=self.desc, + LogUri=s3_log_path, + Role=execution_role['Role']['RoleName'], + ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit}, + Command={"Name": "glueetl", "ScriptLocation": script_location}, + MaxRetries=self.retry_limit, + AllocatedCapacity=self.num_of_dpus + ) + # print(create_job_response) + return create_job_response + except Exception as general_error: + raise AirflowException( + 'Failed to create aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def _check_script_location(self): + """ + :return: S3 Script location path + """ + if self.script_location[:5] == self.S3_PROTOCOL: + return self.script_location + elif os.path.isfile(self.script_location): + s3 = self.get_resource_type('s3', self.region_name) + script_name = os.path.basename(self.script_location) + s3.meta.client.upload_file(self.script_location, + self.s3_bucket, + self.S3_ARTIFACTS_PREFIX + script_name) + + s3_script_path = "s3://{s3_bucket}/{prefix}{job_name}/{script_name}" \ + .format(s3_bucket=self.s3_bucket, + prefix=self.S3_ARTIFACTS_PREFIX, + job_name=self.job_name, + script_name=script_name) + return s3_script_path + else: + return None diff --git a/airflow/contrib/hooks/aws_glue_job_hook.py b/airflow/contrib/hooks/aws_glue_job_hook.py new file mode 100644 index 0000000000..1d43730272 --- /dev/null +++ b/airflow/contrib/hooks/aws_glue_job_hook.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import os.path +import time + + +class AwsGlueJobHook(AwsHook): + """ + Interact with AWS Glue - create job, trigger, crawler + + :param job_name: unique job name per AWS account + :type str + :param desc: job description + :type str + :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job + :type int + :param script_location: path to etl script either on s3 or local + :type str + :param conns: A list of connections used by the job + :type list + :param retry_limit: Maximum number of times to retry this job if it fails + :type int + :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job + :type int + :param region_name: aws region name (example: us-east-1) + :type region_name: str + :param s3_bucket: S3 bucket where logs and local etl script will be uploaded + :type str + :param iam_role_name: AWS IAM Role for Glue Job + :type str + """ + + def __init__(self, + job_name=None, + desc=None, + concurrent_run_limit=None, + script_location=None, + conns=None, + retry_limit=None, + num_of_dpus=None, + aws_conn_id='aws_default', + region_name=None, + iam_role_name=None, + s3_bucket=None, *args, **kwargs): + self.job_name = job_name + self.desc = desc + self.concurrent_run_limit = concurrent_run_limit or 1 + self.script_location = script_location + self.conns = conns or ["s3"] + self.retry_limit = retry_limit or 0 + self.num_of_dpus = num_of_dpus or 10 + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.s3_bucket = s3_bucket + self.role_name = iam_role_name + self.S3_PROTOCOL = "s3://" + self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' + self.S3_GLUE_LOGS = 'logs/glue-logs/' + super(AwsGlueJobHook, self).__init__(*args, **kwargs) + + def get_conn(self): + conn = self.get_client_type('glue', self.region_name) + return conn + + def list_jobs(self): + conn = self.get_conn() + return conn.get_jobs() + + def get_iam_execution_role(self): + """ + :return: iam role for job execution + """ + iam_client = self.get_client_type('iam', self.region_name) + + try: + glue_execution_role = iam_client.get_role(RoleName=self.role_name) + self.log.info("Iam Role Name: {}".format(self.role_name)) + return glue_execution_role + except Exception as general_error: + raise AirflowException( + 'Failed to create aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def initialize_job(self, script_arguments=None): + """ + Initializes connection with AWS Glue + to run job + :return: + """ + if self.s3_bucket is None: + raise AirflowException( + 'Could not initialize glue job, ' + 'error: Specify Parameter `s3_bucket`' + ) + + glue_client = self.get_conn() + + try: + job_response = self.get_or_create_glue_job() + job_name = job_response['Name'] + job_run = glue_client.start_job_run( + JobName=job_name, + Arguments=script_arguments + ) + return self.job_completion(job_name, job_run['JobRunId']) + except Exception as general_error: + raise AirflowException( + 'Failed to run aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def job_completion(self, job_name=None, run_id=None): + """ + :param job_name: + :param run_id: + :return: + """ + glue_client = self.get_conn() + job_status = glue_client.get_job_run( + JobName=job_name, + RunId=run_id, + PredecessorsIncluded=True + ) + job_run_state = job_status['JobRun']['JobRunState'] + failed = job_run_state == 'FAILED' + stopped = job_run_state == 'STOPPED' + completed = job_run_state == 'SUCCEEDED' + + while True: + if failed or stopped or completed: + self.log.info("Exiting Job {} Run State: {}" + .format(run_id, job_run_state)) + return {'JobRunState': job_run_state, 'JobRunId': run_id} + else: + self.log.info("Polling for AWS Glue Job {} current run state" + .format(job_name)) + time.sleep(6) + + def get_or_create_glue_job(self): + glue_client = self.get_conn() + try: + self.log.info("Now creating and running AWS Glue Job") + s3_log_path = "s3://{bucket_name}/{logs_path}{job_name}"\ + .format(bucket_name=self.s3_bucket, + logs_path=self.S3_GLUE_LOGS, + job_name=self.job_name) + + execution_role = self.get_iam_execution_role() + script_location = self._check_script_location() + create_job_response = glue_client.create_job( + Name=self.job_name, + Description=self.desc, + LogUri=s3_log_path, + Role=execution_role['Role']['RoleName'], + ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit}, + Command={"Name": "glueetl", "ScriptLocation": script_location}, + MaxRetries=self.retry_limit, + AllocatedCapacity=self.num_of_dpus + ) + # print(create_job_response) + return create_job_response + except Exception as general_error: + raise AirflowException( + 'Failed to create aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def _check_script_location(self): + """ + :return: S3 Script location path + """ + if self.script_location[:5] == self.S3_PROTOCOL: + return self.script_location + elif os.path.isfile(self.script_location): + s3 = self.get_resource_type('s3', self.region_name) + script_name = os.path.basename(self.script_location) + s3.meta.client.upload_file(self.script_location, + self.s3_bucket, + self.S3_ARTIFACTS_PREFIX + script_name) + + s3_script_path = "s3://{s3_bucket}/{prefix}{job_name}/{script_name}" \ + .format(s3_bucket=self.s3_bucket, + prefix=self.S3_ARTIFACTS_PREFIX, + job_name=self.job_name, + script_name=script_name) + return s3_script_path + else: + return None diff --git a/airflow/contrib/operators/aws_glue_job_operator.py b/airflow/contrib/operators/aws_glue_job_operator.py new file mode 100644 index 0000000000..044b98d54f --- /dev/null +++ b/airflow/contrib/operators/aws_glue_job_operator.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import unicode_literals + +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class AWSGlueJobOperator(BaseOperator): + """ + Creates an AWS Glue Job. AWS Glue is a serverless Spark + ETL service for running Spark Jobs on the AWS cloud. + Language support: Python and Scala + :param job_name: unique job name per AWS Account + :type str + :param script_location: location of ETL script. Must be a local or S3 path + :type str + :param job_desc: job description details + :type str + :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job + :type int + :param script_args: etl script arguments and AWS Glue arguments + :type dict + :param connections: AWS Glue connections to be used by the job. + :type list + :param retry_limit: The maximum number of times to retry this job if it fails + :type int + :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job. + :type int + :param region_name: aws region name (example: us-east-1) + :type region_name: str + :param s3_bucket: S3 bucket where logs and local etl script will be uploaded + :type str + :param iam_role_name: AWS IAM Role for Glue Job Execution + :type str + """ + template_fields = () + template_ext = () + ui_color = '#ededed' + + @apply_defaults + def __init__(self, + job_name='aws_glue_default_job', + job_desc='AWS Glue Job with Airflow', + script_location=None, + concurrent_run_limit=None, + script_args={}, + connections=[], + retry_limit=None, + num_of_dpus=6, + aws_conn_id='aws_default', + region_name=None, + s3_bucket=None, + iam_role_name=None, + *args, **kwargs + ): + super(AWSGlueJobOperator, self).__init__(*args, **kwargs) + self.job_name = job_name + self.job_desc = job_desc + self.script_location = script_location + self.concurrent_run_limit = concurrent_run_limit + self.script_args = script_args + self.connections = connections + self.retry_limit = retry_limit + self.num_of_dpus = num_of_dpus + self.aws_conn_id = aws_conn_id, + self.region_name = region_name + self.s3_bucket = s3_bucket + self.iam_role_name = iam_role_name + + def execute(self, context): + """ + Executes AWS Glue Job from Airflow + :return: + """ + glue_job = AwsGlueJobHook(job_name=self.job_name, + desc=self.job_desc, + concurrent_run_limit=self.concurrent_run_limit, + script_location=self.script_location, + conns=self.connections, + retry_limit=self.retry_limit, + num_of_dpus=self.num_of_dpus, + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + s3_bucket=self.s3_bucket, + iam_role_name=self.iam_role_name) + + self.log.info("Initializing AWS Glue Job: {}".format(self.job_name)) + glue_job_run = glue_job.initialize_job(self.script_args) + self.log.info('AWS Glue Job: {job_name} status: {job_status}. Run Id: {run_id}' + .format(run_id=glue_job_run['JobRunId'], + job_name=self.job_name, + job_status=glue_job_run['JobRunState']) + ) + + self.log.info('Done.') diff --git a/airflow/contrib/sensors/aws_glue_job_sensor.py b/airflow/contrib/sensors/aws_glue_job_sensor.py new file mode 100644 index 0000000000..8ee5fee4e7 --- /dev/null +++ b/airflow/contrib/sensors/aws_glue_job_sensor.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook + + +class AwsGlueJobSensor(BaseSensorOperator): + """ + Waits for an AWS Glue Job to reach any of the status below + 'FAILED', 'STOPPED', 'SUCCEEDED' + + :param job_name: The AWS Glue Job unique name + :type str + :param run_id: The AWS Glue current running job identifier + :type str + """ + template_fields = ('job_name', 'run_id') + + @apply_defaults + def __init__(self, + job_name, + run_id, + aws_conn_id='aws_default', + *args, + **kwargs): + super(AwsGlueJobSensor, self).__init__(*args, **kwargs) + self.job_name = job_name + self.run_id = run_id + self.aws_conn_id = aws_conn_id + self.targeted_status = ['FAILED', 'STOPPED', 'SUCCEEDED'] + + def poke(self, context): + self.log.info("Poking for job run status : {self.targeted_status}\n" + "for Glue Job {self.job_name} and ID {self.run_id}" + .format(**locals())) + hook = AwsGlueJobHook(aws_conn_id=self.aws_conn_id) + job_state = hook.job_completion(job_name=self.job_name, + run_id=self.run_id) + return job_state.upper() in self.targeted_status diff --git a/docs/code.rst b/docs/code.rst index a64c2779d2..e7457ba33e 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -108,7 +108,6 @@ Community-contributed Operators Operators ^^^^^^^^^ -.. Alphabetize this list .. autoclass:: airflow.contrib.operators.awsbatch_operator.AWSBatchOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator @@ -121,13 +120,11 @@ Operators .. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator .. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator .. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator -.. autoclass:: airflow.contrib.operators.cassandra_to_gcs.CassandraToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator .. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator .. autoclass:: airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator .. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator -.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPigOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHiveOperator @@ -174,12 +171,9 @@ Operators .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator -.. autoclass:: airflow.contrib.operators.qubole_check_operator.QuboleCheckOperator -.. autoclass:: airflow.contrib.operators.qubole_check_operator.QuboleValueCheckOperator .. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator .. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator .. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator -.. autoclass:: airflow.contrib.operators.segment_track_event_operator.SegmentTrackEventOperator .. autoclass:: airflow.contrib.operators.sftp_operator.SFTPOperator .. autoclass:: airflow.contrib.operators.slack_webhook_operator.SlackWebhookOperator .. autoclass:: airflow.contrib.operators.snowflake_operator.SnowflakeOperator @@ -190,12 +184,12 @@ Operators .. autoclass:: airflow.contrib.operators.ssh_operator.SSHOperator .. autoclass:: airflow.contrib.operators.vertica_operator.VerticaOperator .. autoclass:: airflow.contrib.operators.vertica_to_hive.VerticaToHiveTransfer -.. autoclass:: airflow.contrib.operators.winrm_operator.WinRMOperator Sensors ^^^^^^^ .. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor +.. autoclass:: airflow.contrib.operators.aws_glue_job_operator.AWSGlueJobOperator .. autoclass:: airflow.contrib.sensors.bash_sensor.BashSensor .. autoclass:: airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor .. autoclass:: airflow.contrib.sensors.cassandra_record_sensor.CassandraRecordSensor @@ -236,12 +230,6 @@ Variable Description ================================= ==================================== ``{{ ds }}`` the execution date as ``YYYY-MM-DD`` ``{{ ds_nodash }}`` the execution date as ``YYYYMMDD`` -``{{ prev_ds }}`` the previous execution date as ``YYYY-MM-DD``. - if ``{{ ds }}`` is ``2016-01-08`` and ``schedule_interval`` is ``@weekly``, - ``{{ prev_ds }}`` will be ``2016-01-01``. -``{{ next_ds }}`` the next execution date as ``YYYY-MM-DD``. - if ``{{ ds }}`` is ``2016-01-01`` and ``schedule_interval`` is ``@weekly``, - ``{{ prev_ds }}`` will be ``2016-01-08``. ``{{ yesterday_ds }}`` yesterday's date as ``YYYY-MM-DD`` ``{{ yesterday_ds_nodash }}`` yesterday's date as ``YYYYMMDD`` ``{{ tomorrow_ds }}`` tomorrow's date as ``YYYY-MM-DD`` @@ -363,14 +351,13 @@ interface when possible and acting as building blocks for operators. Community contributed hooks ''''''''''''''''''''''''''' -.. Alphabetize this list + .. autoclass:: airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook .. autoclass:: airflow.contrib.hooks.aws_hook.AwsHook .. autoclass:: airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook .. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook .. autoclass:: airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook .. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook -.. autoclass:: airflow.contrib.hooks.cassandra_hook.CassandraHook .. autoclass:: airflow.contrib.hooks.cloudant_hook.CloudantHook .. autoclass:: airflow.contrib.hooks.databricks_hook.DatabricksHook .. autoclass:: airflow.contrib.hooks.datadog_hook.DatadogHook @@ -395,7 +382,6 @@ Community contributed hooks .. autoclass:: airflow.contrib.hooks.redis_hook.RedisHook .. autoclass:: airflow.contrib.hooks.redshift_hook.RedshiftHook .. autoclass:: airflow.contrib.hooks.salesforce_hook.SalesforceHook -.. autoclass:: airflow.contrib.hooks.segment_hook.SegmentHook .. autoclass:: airflow.contrib.hooks.sftp_hook.SFTPHook .. autoclass:: airflow.contrib.hooks.slack_webhook_hook.SlackWebhookHook .. autoclass:: airflow.contrib.hooks.snowflake_hook.SnowflakeHook @@ -405,8 +391,8 @@ Community contributed hooks .. autoclass:: airflow.contrib.hooks.sqoop_hook.SqoopHook .. autoclass:: airflow.contrib.hooks.ssh_hook.SSHHook .. autoclass:: airflow.contrib.hooks.vertica_hook.VerticaHook +.. autoclass:: airflow.contrib.hooks.spark_jdbc_hook.SparkJDBCHook .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook -.. autoclass:: airflow.contrib.hooks.winrm_hook.WinRMHook Executors --------- diff --git a/docs/integration.rst b/docs/integration.rst index 660b2163d9..ecdcc5202b 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -8,7 +8,6 @@ Integration - :ref:`GCP` .. _ReverseProxy: - Reverse Proxy ------------- @@ -71,8 +70,7 @@ Azure: Microsoft Azure ---------------------- Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob -Storage and Azure Data Lake. Hook, Sensor and Operator for Blob Storage and -Azure Data Lake Hook are in contrib section. +Storage. Note that the Hook, Sensor and Operator are in the contrib section. Azure Blob Storage '''''''''''''''''' @@ -132,24 +130,35 @@ Logging ''''''' Airflow can be configured to read and write task logs in Azure Blob Storage. -See :ref:`write-logs-azure`. +Follow the steps below to enable Azure Blob Storage logging. -Azure Data Lake -'''''''''''''''''' +#. Airflow's logging system requires a custom .py file to be located in the ``PYTHONPATH``, so that it's importable from Airflow. Start by creating a directory to store the config file. ``$AIRFLOW_HOME/config`` is recommended. +#. Create empty files called ``$AIRFLOW_HOME/config/log_config.py`` and ``$AIRFLOW_HOME/config/__init__.py``. +#. Copy the contents of ``airflow/config_templates/airflow_local_settings.py`` into the ``log_config.py`` file that was just created in the step above. +#. Customize the following portions of the template: -AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a -Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a -login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name) - (see connection `azure_data_lake_default` for an example). + .. code-block:: bash -- :ref:`AzureDataLakeHook`: Interface with Azure Data Lake. + # wasb buckets should start with "wasb" just to help Airflow select correct handler + REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>' -.. _AzureDataLakeHook: + # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG + LOGGING_CONFIG = ... -AzureDataLakeHook -""""""""" -.. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook +#. Make sure a Azure Blob Storage (Wasb) connection hook has been defined in Airflow. The hook should have read and write access to the Azure Blob Storage bucket defined above in ``REMOTE_BASE_LOG_FOLDER``. + +#. Update ``$AIRFLOW_HOME/airflow.cfg`` to contain: + + .. code-block:: bash + + remote_logging = True + logging_config_class = log_config.LOGGING_CONFIG + remote_log_conn_id = <name of the Azure Blob Storage connection> + +#. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution. +#. Verify that logs are showing up for newly executed tasks in the bucket you've defined. + .. _AWS: @@ -272,8 +281,7 @@ AWS RedShift - :ref:`AwsRedshiftClusterSensor` : Waits for a Redshift cluster to reach a specific status. - :ref:`RedshiftHook` : Interact with AWS Redshift, using the boto3 library. -- :ref:`RedshiftToS3Transfer` : Executes an unload command to S3 as CSV with or without headers. -- :ref:`S3ToRedshiftTransfer` : Executes an copy command from S3 as CSV with or without headers. +- :ref:`RedshiftToS3Transfer` : Executes an unload command to S3 as a CSV with headers. .. _AwsRedshiftClusterSensor: @@ -296,20 +304,13 @@ RedshiftToS3Transfer .. autoclass:: airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer -.. _S3ToRedshiftTransfer: - -S3ToRedshiftTransfer -"""""""""""""""""""" - -.. autoclass:: airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer - .. _Databricks: Databricks ---------- -`Databricks <https://databricks.com/>`__ has contributed an Airflow operator which enables +`Databricks <https://databricks.com/>`_ has contributed an Airflow operator which enables submitting runs to the Databricks platform. Internally the operator talks to the ``api/2.0/jobs/runs/submit`` `endpoint <https://docs.databricks.com/api/latest/jobs.html#runs-submit>`_. @@ -329,14 +330,75 @@ Airflow has extensive support for the Google Cloud Platform. But note that most Operators are in the contrib section. Meaning that they have a *beta* status, meaning that they can have breaking changes between minor releases. -See the :ref:`GCP connection type <connection-type-GCP>` documentation to -configure connections to GCP. - Logging ''''''' -Airflow can be configured to read and write task logs in Google Cloud Storage. -See :ref:`write-logs-gcp`. +Airflow can be configured to read and write task logs in Google cloud storage. +Follow the steps below to enable Google cloud storage logging. + +#. Airflow's logging system requires a custom .py file to be located in the ``PYTHONPATH``, so that it's importable from Airflow. Start by creating a directory to store the config file. ``$AIRFLOW_HOME/config`` is recommended. +#. Create empty files called ``$AIRFLOW_HOME/config/log_config.py`` and ``$AIRFLOW_HOME/config/__init__.py``. +#. Copy the contents of ``airflow/config_templates/airflow_local_settings.py`` into the ``log_config.py`` file that was just created in the step above. +#. Customize the following portions of the template: + + .. code-block:: bash + + # Add this variable to the top of the file. Note the trailing slash. + GCS_LOG_FOLDER = 'gs://<bucket where logs should be persisted>/' + + # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG + LOGGING_CONFIG = ... + + # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable + 'gcs.task': { + 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', + 'formatter': 'airflow.task', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'gcs_log_folder': GCS_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + + # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. + 'loggers': { + 'airflow.task': { + 'handlers': ['gcs.task'], + ... + }, + 'airflow.task_runner': { + 'handlers': ['gcs.task'], + ... + }, + 'airflow': { + 'handlers': ['console'], + ... + }, + } + +#. Make sure a Google cloud platform connection hook has been defined in Airflow. The hook should have read and write access to the Google cloud storage bucket defined above in ``GCS_LOG_FOLDER``. + +#. Update ``$AIRFLOW_HOME/airflow.cfg`` to contain: + + .. code-block:: bash + + task_log_reader = gcs.task + logging_config_class = log_config.LOGGING_CONFIG + remote_log_conn_id = <name of the Google cloud platform hook> + +#. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution. +#. Verify that logs are showing up for newly executed tasks in the bucket you've defined. +#. Verify that the Google cloud storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like: + + .. code-block:: bash + + *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. + [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 + [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] + [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor + [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py + +Note the top line that says it's reading from the remote log file. + +Please be aware that if you were persisting logs to Google cloud storage using the old-style airflow.cfg configuration method, the old logs will no longer be visible in the Airflow UI, though they'll still exist in Google cloud storage. This is a backwards incompatbile change. If you are unhappy with it, you can change the ``FILENAME_TEMPLATE`` to reflect the old-style log filename format. BigQuery '''''''' @@ -514,7 +576,6 @@ DataProc Operators - :ref:`DataprocClusterCreateOperator` : Create a new cluster on Google Cloud Dataproc. - :ref:`DataprocClusterDeleteOperator` : Delete a cluster on Google Cloud Dataproc. -- :ref:`DataprocClusterScaleOperator` : Scale up or down a cluster on Google Cloud Dataproc. - :ref:`DataProcPigOperator` : Start a Pig query Job on a Cloud DataProc cluster. - :ref:`DataProcHiveOperator` : Start a Hive query Job on a Cloud DataProc cluster. - :ref:`DataProcSparkSqlOperator` : Start a Spark SQL query Job on a Cloud DataProc cluster. @@ -531,13 +592,6 @@ DataprocClusterCreateOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator -.. _DataprocClusterScaleOperator: - -DataprocClusterScaleOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator - .. _DataprocClusterDeleteOperator: DataprocClusterDeleteOperator diff --git a/tests/contrib/hooks/test_aws_glue_job_hook.py b/tests/contrib/hooks/test_aws_glue_job_hook.py new file mode 100644 index 0000000000..481880908b --- /dev/null +++ b/tests/contrib/hooks/test_aws_glue_job_hook.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest +import json + + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +try: + from moto import mock_iam +except ImportError: + mock_iam = None + + +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook + + +class TestGlueJobHook(unittest.TestCase): + + def setUp(self): + self.some_aws_region = "us-west-2" + + @mock.patch.object(AwsGlueJobHook, 'get_conn') + def test_get_conn_returns_a_boto3_connection(self, mock_get_conn): + hook = AwsGlueJobHook(job_name='aws_test_glue_job', s3_bucket='some_bucket') + self.assertIsNotNone(hook.get_conn()) + + @unittest.skipIf(mock_iam is None, 'mock_iam package not present') + @mock_iam + def test_get_iam_execution_role(self): + hook = AwsGlueJobHook(job_name='aws_test_glue_job', + s3_bucket='some_bucket', + iam_role_name='my_test_role') + iam_role = hook.get_client_type('iam').create_role( + Path="/", + RoleName='my_test_role', + AssumeRolePolicyDocument=json.dumps({ + "Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Principal": {"Service": "glue.amazonaws.com"}, + "Action": "sts:AssumeRole" + } + }) + ) + iam_role = hook.get_iam_execution_role() + self.assertIsNotNone(iam_role) + + @mock.patch.object(AwsGlueJobHook, "_check_script_location") + @mock.patch.object(AwsGlueJobHook, "get_iam_execution_role") + @mock.patch.object(AwsGlueJobHook, "get_conn") + def test_get_or_create_glue_job(self, mock_get_conn, + mock_get_iam_execution_role, + mock_script): + mock_get_iam_execution_role.return_value = \ + mock.MagicMock(Role={'RoleName': 'my_test_role'}) + some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py" + some_s3_bucket = "my-includes" + mock_script.return_value = mock.Mock(return_value=some_script) + + mock_glue_job = mock_get_conn.return_value.create_job() + glue_job = AwsGlueJobHook(job_name='aws_test_glue_job', + desc='This is test case job from Airflow', + script_location=some_script, + iam_role_name='my_test_role', + s3_bucket=some_s3_bucket, + region_name=self.some_aws_region)\ + .get_or_create_glue_job() + self.assertEqual(glue_job, mock_glue_job) + + @mock.patch.object(AwsGlueJobHook, "job_completion") + @mock.patch.object(AwsGlueJobHook, "get_or_create_glue_job") + @mock.patch.object(AwsGlueJobHook, "get_conn") + def test_initialize_job(self, mock_get_conn, + mock_get_or_create_glue_job, + mock_completion): + some_data_path = "s3://glue-datasets/examples/medicare/SampleData.csv" + some_script_arguments = {"--s3_input_data_path": some_data_path} + some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py" + some_s3_bucket = "my-includes" + + mock_get_or_create_glue_job.Name = mock.Mock(Name='aws_test_glue_job') + mock_get_conn.return_value.start_job_run() + + mock_job_run_state = mock_completion.return_value + glue_job_run_state = AwsGlueJobHook(job_name='aws_test_glue_job', + desc='This is test case job from Airflow', + iam_role_name='my_test_role', + script_location=some_script, + s3_bucket=some_s3_bucket, + region_name=self.some_aws_region)\ + .initialize_job(some_script_arguments) + self.assertEqual(glue_job_run_state, mock_job_run_state, msg='Mocks but be equal') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/contrib/operators/test_aws_glue_job_operator.py b/tests/contrib/operators/test_aws_glue_job_operator.py new file mode 100644 index 0000000000..be68c7a7bb --- /dev/null +++ b/tests/contrib/operators/test_aws_glue_job_operator.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow import configuration +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook +from airflow.contrib.operators.aws_glue_job_operator import AWSGlueJobOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestAwsGlueJobOperator(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.aws_glue_job_operator.AwsGlueJobHook') + def setUp(self, glue_hook_mock): + configuration.load_test_config() + + self.glue_hook_mock = glue_hook_mock + some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py" + self.glue = AWSGlueJobOperator(task_id='test_glue_operator', + job_name='my_test_job', + script_location=some_script, + aws_conn_id='aws_default', + region_name='us-west-2', + s3_bucket='some_bucket', + iam_role_name='my_test_role') + + @mock.patch.object(AwsGlueJobHook, 'initialize_job') + def test_execute_without_failure(self, mock_initialize_job): + mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'} + self.glue.execute(None) + + mock_initialize_job.assert_called_once_with({}) + self.assertEqual(self.glue.job_name, 'my_test_job') diff --git a/tests/contrib/sensors/test_aws_glue_job_sensor.py b/tests/contrib/sensors/test_aws_glue_job_sensor.py new file mode 100644 index 0000000000..93d2379030 --- /dev/null +++ b/tests/contrib/sensors/test_aws_glue_job_sensor.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow import configuration +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook +from airflow.contrib.sensors.aws_glue_job_sensor import AwsGlueJobSensor + + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestAwsGlueJobSensor(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + + @mock.patch.object(AwsGlueJobHook, 'get_conn') + @mock.patch.object(AwsGlueJobHook, 'job_completion') + def test_poke(self, mock_job_completion, mock_conn): + mock_conn.return_value.get_job_run() + mock_job_completion.return_value = 'SUCCEEDED' + op = AwsGlueJobSensor(task_id='test_glue_job_sensor', + job_name='aws_test_glue_job', + run_id='5152fgsfsjhsh61661', + poke_interval=1, + timeout=5, + aws_conn_id='aws_default') + self.assertTrue(op.poke(None)) + + @mock.patch.object(AwsGlueJobHook, 'get_conn') + @mock.patch.object(AwsGlueJobHook, 'job_completion') + def test_poke_false(self, mock_job_completion, mock_conn): + mock_conn.return_value.get_job_run() + mock_job_completion.return_value = 'RUNNING' + op = AwsGlueJobSensor(task_id='test_glue_job_sensor', + job_name='aws_test_glue_job', + run_id='5152fgsfsjhsh61661', + poke_interval=1, + timeout=5, + aws_conn_id='aws_default') + self.assertFalse(op.poke(None)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/sensors/test_aws_glue_job_sensor.py b/tests/sensors/test_aws_glue_job_sensor.py new file mode 100644 index 0000000000..93d2379030 --- /dev/null +++ b/tests/sensors/test_aws_glue_job_sensor.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow import configuration +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook +from airflow.contrib.sensors.aws_glue_job_sensor import AwsGlueJobSensor + + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestAwsGlueJobSensor(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + + @mock.patch.object(AwsGlueJobHook, 'get_conn') + @mock.patch.object(AwsGlueJobHook, 'job_completion') + def test_poke(self, mock_job_completion, mock_conn): + mock_conn.return_value.get_job_run() + mock_job_completion.return_value = 'SUCCEEDED' + op = AwsGlueJobSensor(task_id='test_glue_job_sensor', + job_name='aws_test_glue_job', + run_id='5152fgsfsjhsh61661', + poke_interval=1, + timeout=5, + aws_conn_id='aws_default') + self.assertTrue(op.poke(None)) + + @mock.patch.object(AwsGlueJobHook, 'get_conn') + @mock.patch.object(AwsGlueJobHook, 'job_completion') + def test_poke_false(self, mock_job_completion, mock_conn): + mock_conn.return_value.get_job_run() + mock_job_completion.return_value = 'RUNNING' + op = AwsGlueJobSensor(task_id='test_glue_job_sensor', + job_name='aws_test_glue_job', + run_id='5152fgsfsjhsh61661', + poke_interval=1, + timeout=5, + aws_conn_id='aws_default') + self.assertFalse(op.poke(None)) + + +if __name__ == '__main__': + unittest.main() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
