http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/integration.html ---------------------------------------------------------------------- diff --git a/integration.html b/integration.html index a55aa41..2f4ff20 100644 --- a/integration.html +++ b/integration.html @@ -13,6 +13,8 @@ + + @@ -81,7 +83,10 @@ - <ul class="current"> + + + + <ul class="current"> <li class="toctree-l1"><a class="reference internal" href="project.html">Project</a></li> <li class="toctree-l1"><a class="reference internal" href="license.html">License</a></li> <li class="toctree-l1"><a class="reference internal" href="start.html">Quick Start</a></li> @@ -97,8 +102,46 @@ <li class="toctree-l1"><a class="reference internal" href="security.html">Security</a></li> <li class="toctree-l1"><a class="reference internal" href="api.html">Experimental Rest API</a></li> <li class="toctree-l1 current"><a class="current reference internal" href="#">Integration</a><ul> -<li class="toctree-l2"><a class="reference internal" href="#aws-amazon-webservices">AWS: Amazon Webservices</a></li> +<li class="toctree-l2"><a class="reference internal" href="#azure-microsoft-azure">Azure: Microsoft Azure</a><ul> +<li class="toctree-l3"><a class="reference internal" href="#azure-blob-storage">Azure Blob Storage</a><ul> +<li class="toctree-l4"><a class="reference internal" href="#wasbblobsensor">WasbBlobSensor</a></li> +<li class="toctree-l4"><a class="reference internal" href="#wasbprefixsensor">WasbPrefixSensor</a></li> +<li class="toctree-l4"><a class="reference internal" href="#filetowasboperator">FileToWasbOperator</a></li> +<li class="toctree-l4"><a class="reference internal" href="#wasbhook">WasbHook</a></li> +</ul> +</li> +</ul> +</li> +<li class="toctree-l2"><a class="reference internal" href="#aws-amazon-web-services">AWS: Amazon Web Services</a><ul> +<li class="toctree-l3"><a class="reference internal" href="#aws-emr">AWS EMR</a><ul> +<li class="toctree-l4"><a class="reference internal" href="#emraddstepsoperator">EmrAddStepsOperator</a></li> +<li class="toctree-l4"><a class="reference internal" href="#emrcreatejobflowoperator">EmrCreateJobFlowOperator</a></li> +<li class="toctree-l4"><a class="reference internal" href="#emrterminatejobflowoperator">EmrTerminateJobFlowOperator</a></li> +<li class="toctree-l4"><a class="reference internal" href="#emrhook">EmrHook</a></li> +</ul> +</li> +<li class="toctree-l3"><a class="reference internal" href="#aws-s3">AWS S3</a><ul> +<li class="toctree-l4"><a class="reference internal" href="#s3filetransformoperator">S3FileTransformOperator</a></li> +<li class="toctree-l4"><a class="reference internal" href="#s3tohivetransfer">S3ToHiveTransfer</a></li> +<li class="toctree-l4"><a class="reference internal" href="#s3hook">S3Hook</a></li> +</ul> +</li> +<li class="toctree-l3"><a class="reference internal" href="#aws-ec2-container-service">AWS EC2 Container Service</a><ul> +<li class="toctree-l4"><a class="reference internal" href="#ecsoperator">ECSOperator</a></li> +</ul> +</li> +<li class="toctree-l3"><a class="reference internal" href="#aws-redshift">AWS RedShift</a><ul> +<li class="toctree-l4"><a class="reference internal" href="#redshifttos3transfer">RedshiftToS3Transfer</a></li> +</ul> +</li> +</ul> +</li> +<li class="toctree-l2"><a class="reference internal" href="#databricks">Databricks</a><ul> +<li class="toctree-l3"><a class="reference internal" href="#databrickssubmitrunoperator">DatabricksSubmitRunOperator</a></li> +</ul> +</li> <li class="toctree-l2"><a class="reference internal" href="#gcp-google-cloud-platform">GCP: Google Cloud Platform</a><ul> +<li class="toctree-l3"><a class="reference internal" href="#logging">Logging</a></li> <li class="toctree-l3"><a class="reference internal" href="#bigquery">BigQuery</a><ul> <li class="toctree-l4"><a class="reference internal" href="#bigquery-operators">BigQuery Operators</a></li> <li class="toctree-l4"><a class="reference internal" href="#bigqueryhook">BigQueryHook</a></li> @@ -111,11 +154,15 @@ </li> <li class="toctree-l3"><a class="reference internal" href="#cloud-dataproc">Cloud DataProc</a><ul> <li class="toctree-l4"><a class="reference internal" href="#dataproc-operators">DataProc Operators</a></li> -<li class="toctree-l4"><a class="reference internal" href="#dataprocpysparkoperator">DataProcPySparkOperator</a></li> </ul> </li> <li class="toctree-l3"><a class="reference internal" href="#cloud-datastore">Cloud Datastore</a><ul> -<li class="toctree-l4"><a class="reference internal" href="#datastore-operators">Datastore Operators</a></li> +<li class="toctree-l4"><a class="reference internal" href="#datastorehook">DatastoreHook</a></li> +</ul> +</li> +<li class="toctree-l3"><a class="reference internal" href="#cloud-ml-engine">Cloud ML Engine</a><ul> +<li class="toctree-l4"><a class="reference internal" href="#cloud-ml-engine-operators">Cloud ML Engine Operators</a></li> +<li class="toctree-l4"><a class="reference internal" href="#cloud-ml-engine-hook">Cloud ML Engine Hook</a></li> </ul> </li> <li class="toctree-l3"><a class="reference internal" href="#cloud-storage">Cloud Storage</a><ul> @@ -195,18 +242,601 @@ <div class="section" id="integration"> <h1>Integration<a class="headerlink" href="#integration" title="Permalink to this headline">¶</a></h1> <ul class="simple"> -<li><a class="reference internal" href="#aws"><span class="std std-ref">AWS: Amazon Webservices</span></a></li> +<li><a class="reference internal" href="#azure"><span class="std std-ref">Azure: Microsoft Azure</span></a></li> +<li><a class="reference internal" href="#aws"><span class="std std-ref">AWS: Amazon Web Services</span></a></li> +<li><span class="xref std std-ref">Databricks</span></li> <li><a class="reference internal" href="#gcp"><span class="std std-ref">GCP: Google Cloud Platform</span></a></li> </ul> -<div class="section" id="aws-amazon-webservices"> -<span id="aws"></span><h2>AWS: Amazon Webservices<a class="headerlink" href="#aws-amazon-webservices" title="Permalink to this headline">¶</a></h2> -<p>—</p> +<div class="section" id="azure-microsoft-azure"> +<span id="azure"></span><h2>Azure: Microsoft Azure<a class="headerlink" href="#azure-microsoft-azure" title="Permalink to this headline">¶</a></h2> +<p>Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob +Storage. Note that the Hook, Sensor and Operator are in the contrib section.</p> +<div class="section" id="azure-blob-storage"> +<h3>Azure Blob Storage<a class="headerlink" href="#azure-blob-storage" title="Permalink to this headline">¶</a></h3> +<p>All classes communicate via the Window Azure Storage Blob protocol. Make sure that a +Airflow connection of type <cite>wasb</cite> exists. Authorization can be done by supplying a +login (=Storage account name) and password (=KEY), or login and SAS token in the extra +field (see connection <cite>wasb_default</cite> for an example).</p> +<ul class="simple"> +<li><a class="reference internal" href="#wasbblobsensor"><span class="std std-ref">WasbBlobSensor</span></a>: Checks if a blob is present on Azure Blob storage.</li> +<li><a class="reference internal" href="#wasbprefixsensor"><span class="std std-ref">WasbPrefixSensor</span></a>: Checks if blobs matching a prefix are present on Azure Blob storage.</li> +<li><a class="reference internal" href="#filetowasboperator"><span class="std std-ref">FileToWasbOperator</span></a>: Uploads a local file to a container as a blob.</li> +<li><a class="reference internal" href="#wasbhook"><span class="std std-ref">WasbHook</span></a>: Interface with Azure Blob Storage.</li> +</ul> +<div class="section" id="wasbblobsensor"> +<span id="id1"></span><h4>WasbBlobSensor<a class="headerlink" href="#wasbblobsensor" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.sensors.wasb_sensor.WasbBlobSensor"> +<em class="property">class </em><code class="descclassname">airflow.contrib.sensors.wasb_sensor.</code><code class="descname">WasbBlobSensor</code><span class="sig-paren">(</span><em>container_name</em>, <em>blob_name</em>, <em>wasb_conn_id='wasb_default'</em>, <em>check_options=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/sensors/wasb_sensor.html#WasbBlobSensor"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.sensors.wasb_sensor.WasbBlobSensor" title="Permalink to this definition">¶</a></dt> +<dd><p>Waits for a blob to arrive on Azure Blob Storage.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>container_name</strong> (<em>str</em>) â Name of the container.</li> +<li><strong>blob_name</strong> (<em>str</em>) â Name of the blob.</li> +<li><strong>wasb_conn_id</strong> (<em>str</em>) â Reference to the wasb connection.</li> +<li><strong>check_options</strong> (<em>dict</em>) â Optional keyword arguments that +<cite>WasbHook.check_for_blob()</cite> takes.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="wasbprefixsensor"> +<span id="id2"></span><h4>WasbPrefixSensor<a class="headerlink" href="#wasbprefixsensor" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor"> +<em class="property">class </em><code class="descclassname">airflow.contrib.sensors.wasb_sensor.</code><code class="descname">WasbPrefixSensor</code><span class="sig-paren">(</span><em>container_name</em>, <em>prefix</em>, <em>wasb_conn_id='wasb_default'</em>, <em>check_options=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/sensors/wasb_sensor.html#WasbPrefixSensor"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor" title="Permalink to this definition">¶</a></dt> +<dd><p>Waits for blobs matching a prefix to arrive on Azure Blob Storage.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>container_name</strong> (<em>str</em>) â Name of the container.</li> +<li><strong>prefix</strong> (<em>str</em>) â Prefix of the blob.</li> +<li><strong>wasb_conn_id</strong> (<em>str</em>) â Reference to the wasb connection.</li> +<li><strong>check_options</strong> (<em>dict</em>) â Optional keyword arguments that +<cite>WasbHook.check_for_prefix()</cite> takes.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="filetowasboperator"> +<span id="id3"></span><h4>FileToWasbOperator<a class="headerlink" href="#filetowasboperator" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.operators.file_to_wasb.FileToWasbOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.file_to_wasb.</code><code class="descname">FileToWasbOperator</code><span class="sig-paren">(</span><em>file_path</em>, <em>container_name</em>, <em>blob_name</em>, <em>wasb_conn_id='wasb_default'</em>, <em>load_options=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/file_to_wasb.html#FileToWasbOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.file_to_wasb.FileToWasbOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Uploads a file to Azure Blob Storage.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>file_path</strong> (<em>str</em>) â Path to the file to load.</li> +<li><strong>container_name</strong> (<em>str</em>) â Name of the container.</li> +<li><strong>blob_name</strong> (<em>str</em>) â Name of the blob.</li> +<li><strong>wasb_conn_id</strong> (<em>str</em>) â Reference to the wasb connection.</li> +<li><strong>load_options</strong> (<em>dict</em>) â Optional keyword arguments that +<cite>WasbHook.load_file()</cite> takes.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="wasbhook"> +<span id="id4"></span><h4>WasbHook<a class="headerlink" href="#wasbhook" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.hooks.wasb_hook.WasbHook"> +<em class="property">class </em><code class="descclassname">airflow.contrib.hooks.wasb_hook.</code><code class="descname">WasbHook</code><span class="sig-paren">(</span><em>wasb_conn_id='wasb_default'</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/wasb_hook.html#WasbHook"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.wasb_hook.WasbHook" title="Permalink to this definition">¶</a></dt> +<dd><p>Interacts with Azure Blob Storage through the wasb:// protocol.</p> +<p>Additional options passed in the âextraâ field of the connection will be +passed to the <cite>BlockBlockService()</cite> constructor. For example, authenticate +using a SAS token by adding {âsas_tokenâ: âYOUR_TOKENâ}.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><strong>wasb_conn_id</strong> (<em>str</em>) â Reference to the wasb connection.</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +</div> +</div> +<div class="section" id="aws-amazon-web-services"> +<span id="aws"></span><h2>AWS: Amazon Web Services<a class="headerlink" href="#aws-amazon-web-services" title="Permalink to this headline">¶</a></h2> +<p>Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and +Operators are in the contrib section.</p> +<div class="section" id="aws-emr"> +<h3>AWS EMR<a class="headerlink" href="#aws-emr" title="Permalink to this headline">¶</a></h3> +<ul class="simple"> +<li><a class="reference internal" href="#emraddstepsoperator"><span class="std std-ref">EmrAddStepsOperator</span></a> : Adds steps to an existing EMR JobFlow.</li> +<li><a class="reference internal" href="#emrcreatejobflowoperator"><span class="std std-ref">EmrCreateJobFlowOperator</span></a> : Creates an EMR JobFlow, reading the config from the EMR connection.</li> +<li><a class="reference internal" href="#emrterminatejobflowoperator"><span class="std std-ref">EmrTerminateJobFlowOperator</span></a> : Terminates an EMR JobFlow.</li> +<li><a class="reference internal" href="#emrhook"><span class="std std-ref">EmrHook</span></a> : Interact with AWS EMR.</li> +</ul> +<div class="section" id="emraddstepsoperator"> +<span id="id5"></span><h4>EmrAddStepsOperator<a class="headerlink" href="#emraddstepsoperator" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.emr_add_steps_operator.</code><code class="descname">EmrAddStepsOperator</code><span class="sig-paren">(</span><em>job_flow_id</em>, <em>aws_conn_id='s3_default'</em>, <em>steps=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/emr_add_steps_operator.html#EmrAddStepsOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>An operator that adds steps to an existing EMR job_flow.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>job_flow_id</strong> â id of the JobFlow to add steps to</li> +<li><strong>aws_conn_id</strong> (<em>str</em>) â aws connection to uses</li> +<li><strong>steps</strong> (<a class="reference internal" href="#airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook.list" title="airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook.list"><em>list</em></a>) â boto3 style steps to be added to the jobflow</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="emrcreatejobflowoperator"> +<span id="id6"></span><h4>EmrCreateJobFlowOperator<a class="headerlink" href="#emrcreatejobflowoperator" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.emr_create_job_flow_operator.</code><code class="descname">EmrCreateJobFlowOperator</code><span class="sig-paren">(</span><em>aws_conn_id='s3_default'</em>, <em>emr_conn_id='emr_default'</em>, <em>job_flow_overrides=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/emr_create_job_flow_operator.html#EmrCreateJobFlowOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Creates an EMR JobFlow, reading the config from the EMR connection. +A dictionary of JobFlow overrides can be passed that override the config from the connection.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>aws_conn_id</strong> (<em>str</em>) â aws connection to uses</li> +<li><strong>emr_conn_id</strong> (<em>str</em>) â emr connection to use</li> +<li><strong>job_flow_overrides</strong> â boto3 style arguments to override emr_connection extra</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="emrterminatejobflowoperator"> +<span id="id7"></span><h4>EmrTerminateJobFlowOperator<a class="headerlink" href="#emrterminatejobflowoperator" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.emr_terminate_job_flow_operator.</code><code class="descname">EmrTerminateJobFlowOperator</code><span class="sig-paren">(</span><em>job_flow_id</em>, <em>aws_conn_id='s3_default'</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/emr_terminate_job_flow_operator.html#EmrTerminateJobFlowOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Operator to terminate EMR JobFlows.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>job_flow_id</strong> â id of the JobFlow to terminate</li> +<li><strong>aws_conn_id</strong> (<em>str</em>) â aws connection to uses</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="emrhook"> +<span id="id8"></span><h4>EmrHook<a class="headerlink" href="#emrhook" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.hooks.emr_hook.EmrHook"> +<em class="property">class </em><code class="descclassname">airflow.contrib.hooks.emr_hook.</code><code class="descname">EmrHook</code><span class="sig-paren">(</span><em>emr_conn_id=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/emr_hook.html#EmrHook"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.emr_hook.EmrHook" title="Permalink to this definition">¶</a></dt> +<dd><p>Interact with AWS EMR. emr_conn_id is only neccessary for using the create_job_flow method.</p> +</dd></dl> + +</div> +</div> +<div class="section" id="aws-s3"> +<h3>AWS S3<a class="headerlink" href="#aws-s3" title="Permalink to this headline">¶</a></h3> +<ul class="simple"> +<li><a class="reference internal" href="#s3filetransformoperator"><span class="std std-ref">S3FileTransformOperator</span></a> : Copies data from a source S3 location to a temporary location on the local filesystem.</li> +<li><a class="reference internal" href="#s3tohivetransfer"><span class="std std-ref">S3ToHiveTransfer</span></a> : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.</li> +<li><a class="reference internal" href="#s3hook"><span class="std std-ref">S3Hook</span></a> : Interact with AWS S3.</li> +</ul> +<div class="section" id="s3filetransformoperator"> +<span id="id9"></span><h4>S3FileTransformOperator<a class="headerlink" href="#s3filetransformoperator" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.operators.s3_file_transform_operator.S3FileTransformOperator"> +<em class="property">class </em><code class="descclassname">airflow.operators.s3_file_transform_operator.</code><code class="descname">S3FileTransformOperator</code><span class="sig-paren">(</span><em>source_s3_key</em>, <em>dest_s3_key</em>, <em>transform_script</em>, <em>source_aws_conn_id='aws_default'</em>, <em>dest_aws_conn_id='aws_default'</em>, <em>replace=False</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/operators/s3_file_transform_operator.html#S3FileTransformOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.operators.s3_file_transform_operator.S3FileTransformOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Copies data from a source S3 location to a temporary location on the +local filesystem. Runs a transformation on this file as specified by +the transformation script and uploads the output to a destination S3 +location.</p> +<p>The locations of the source and the destination files in the local +filesystem is provided as an first and second arguments to the +transformation script. The transformation script is expected to read the +data from source , transform it and write the output to the local +destination file. The operator then takes over control and uploads the +local destination file to S3.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>source_s3_key</strong> (<em>str</em>) â The key to be retrieved from S3</li> +<li><strong>source_aws_conn_id</strong> (<em>str</em>) â source s3 connection</li> +<li><strong>dest_s3_key</strong> (<em>str</em>) â The key to be written from S3</li> +<li><strong>dest_aws_conn_id</strong> (<em>str</em>) â destination s3 connection</li> +<li><strong>replace</strong> (<em>bool</em>) â Replace dest S3 key if it already exists</li> +<li><strong>transform_script</strong> (<em>str</em>) â location of the executable transformation script</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="s3tohivetransfer"> +<span id="id10"></span><h4>S3ToHiveTransfer<a class="headerlink" href="#s3tohivetransfer" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.operators.s3_to_hive_operator.S3ToHiveTransfer"> +<em class="property">class </em><code class="descclassname">airflow.operators.s3_to_hive_operator.</code><code class="descname">S3ToHiveTransfer</code><span class="sig-paren">(</span><em>s3_key</em>, <em>field_dict</em>, <em>hive_table</em>, <em>delimiter='</em>, <em>'</em>, <em>create=True</em>, <em>recreate=False</em>, <em>partition=None</em>, <em>headers=False</em>, <em>check_headers=False</em>, <em>wildcard_match=False</em>, <em>aws_conn_id='aws_default'</em>, <em>hive_cli_conn_id='hive_cli_default'</em>, <em>input_compressed=False</em>, <em>tblproperties=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/operators/s3_to_hive_operator.html#S3ToHiveTransfer"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.operators.s3_to_hive_operator.S3ToHiveTransfer" title="Permalink to this definition">¶</a></dt> +<dd><p>Moves data from S3 to Hive. The operator downloads a file from S3, +stores the file locally before loading it into a Hive table. +If the <code class="docutils literal"><span class="pre">create</span></code> or <code class="docutils literal"><span class="pre">recreate</span></code> arguments are set to <code class="docutils literal"><span class="pre">True</span></code>, +a <code class="docutils literal"><span class="pre">CREATE</span> <span class="pre">TABLE</span></code> and <code class="docutils literal"><span class="pre">DROP</span> <span class="pre">TABLE</span></code> statements are generated. +Hive data types are inferred from the cursorâs metadata from.</p> +<p>Note that the table generated in Hive uses <code class="docutils literal"><span class="pre">STORED</span> <span class="pre">AS</span> <span class="pre">textfile</span></code> +which isnât the most efficient serialization format. If a +large amount of data is loaded and/or if the tables gets +queried considerably, you may want to use this operator only to +stage the data into a temporary table before loading it into its +final destination using a <code class="docutils literal"><span class="pre">HiveOperator</span></code>.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>s3_key</strong> (<em>str</em>) â The key to be retrieved from S3</li> +<li><strong>field_dict</strong> (<em>dict</em>) â A dictionary of the fields name in the file +as keys and their Hive types as values</li> +<li><strong>hive_table</strong> (<em>str</em>) â target Hive table, use dot notation to target a +specific database</li> +<li><strong>create</strong> (<em>bool</em>) â whether to create the table if it doesnât exist</li> +<li><strong>recreate</strong> (<em>bool</em>) â whether to drop and recreate the table at every +execution</li> +<li><strong>partition</strong> (<em>dict</em>) â target partition as a dict of partition columns +and values</li> +<li><strong>headers</strong> (<em>bool</em>) â whether the file contains column names on the first +line</li> +<li><strong>check_headers</strong> (<em>bool</em>) â whether the column names on the first line should be +checked against the keys of field_dict</li> +<li><strong>wildcard_match</strong> (<em>bool</em>) â whether the s3_key should be interpreted as a Unix +wildcard pattern</li> +<li><strong>delimiter</strong> (<em>str</em>) â field delimiter in the file</li> +<li><strong>aws_conn_id</strong> (<em>str</em>) â source s3 connection</li> +<li><strong>hive_cli_conn_id</strong> (<em>str</em>) â destination hive connection</li> +<li><strong>input_compressed</strong> (<em>bool</em>) â Boolean to determine if file decompression is +required to process headers</li> +<li><strong>tblproperties</strong> (<em>dict</em>) â TBLPROPERTIES of the hive table being created</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +<div class="section" id="s3hook"> +<span id="id11"></span><h4>S3Hook<a class="headerlink" href="#s3hook" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.hooks.S3_hook.S3Hook"> +<em class="property">class </em><code class="descclassname">airflow.hooks.S3_hook.</code><code class="descname">S3Hook</code><span class="sig-paren">(</span><em>aws_conn_id='aws_default'</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/hooks/S3_hook.html#S3Hook"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.hooks.S3_hook.S3Hook" title="Permalink to this definition">¶</a></dt> +<dd><p>Interact with AWS S3, using the boto3 library.</p> +</dd></dl> + +</div> +</div> +<div class="section" id="aws-ec2-container-service"> +<h3>AWS EC2 Container Service<a class="headerlink" href="#aws-ec2-container-service" title="Permalink to this headline">¶</a></h3> +<ul class="simple"> +<li><a class="reference internal" href="#ecsoperator"><span class="std std-ref">ECSOperator</span></a> : Execute a task on AWS EC2 Container Service.</li> +</ul> +<div class="section" id="ecsoperator"> +<span id="id12"></span><h4>ECSOperator<a class="headerlink" href="#ecsoperator" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.operators.ecs_operator.ECSOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.ecs_operator.</code><code class="descname">ECSOperator</code><span class="sig-paren">(</span><em>task_definition</em>, <em>cluster</em>, <em>overrides</em>, <em>aws_conn_id=None</em>, <em>region_name=None</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/ecs_operator.html#ECSOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.ecs_operator.ECSOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Execute a task on AWS EC2 Container Service</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first simple"> +<li><strong>task_definition</strong> (<em>str</em>) â the task definition name on EC2 Container Service</li> +<li><strong>cluster</strong> (<em>str</em>) â the cluster name on EC2 Container Service</li> +<li><strong>aws_conn_id</strong> (<em>str</em>) â connection id of AWS credentials / region name. If None, +credential boto3 strategy will be used (<a class="reference external" href="http://boto3.readthedocs.io/en/latest/guide/configuration.html">http://boto3.readthedocs.io/en/latest/guide/configuration.html</a>).</li> +<li><strong>region_name</strong> â region name to use in AWS Hook. Override the region_name in connection (if provided)</li> +</ul> +</td> +</tr> +<tr class="field-even field"><th class="field-name">Param:</th><td class="field-body"><p class="first">overrides: the same parameter that boto3 will receive: +<a class="reference external" href="http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task">http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task</a></p> +</td> +</tr> +<tr class="field-odd field"><th class="field-name">Type:</th><td class="field-body"><p class="first last">overrides: dict</p> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> +</div> +<div class="section" id="aws-redshift"> +<h3>AWS RedShift<a class="headerlink" href="#aws-redshift" title="Permalink to this headline">¶</a></h3> +<ul class="simple"> +<li><a class="reference internal" href="#redshifttos3transfer"><span class="std std-ref">RedshiftToS3Transfer</span></a> : Executes an unload command to S3 as a CSV with headers.</li> +</ul> +<div class="section" id="redshifttos3transfer"> +<span id="id13"></span><h4>RedshiftToS3Transfer<a class="headerlink" href="#redshifttos3transfer" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer"> +<em class="property">class </em><code class="descclassname">airflow.operators.redshift_to_s3_operator.</code><code class="descname">RedshiftToS3Transfer</code><span class="sig-paren">(</span><em>schema</em>, <em>table</em>, <em>s3_bucket</em>, <em>s3_key</em>, <em>redshift_conn_id='redshift_default'</em>, <em>aws_conn_id='aws_default'</em>, <em>unload_options=()</em>, <em>autocommit=False</em>, <em>parameters=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/operators/redshift_to_s3_operator.html#RedshiftToS3Transfer"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer" title="Permalink to this definition">¶</a></dt> +<dd><p>Executes an UNLOAD command to s3 as a CSV with headers +:param schema: reference to a specific schema in redshift database +:type schema: string +:param table: reference to a specific table in redshift database +:type table: string +:param s3_bucket: reference to a specific S3 bucket +:type s3_bucket: string +:param s3_key: reference to a specific S3 key +:type s3_key: string +:param redshift_conn_id: reference to a specific redshift database +:type redshift_conn_id: string +:param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: string +:param options: reference to a list of UNLOAD options +:type options: list</p> +</dd></dl> + +</div> +</div> +</div> +<div class="section" id="databricks"> +<span id="id14"></span><h2>Databricks<a class="headerlink" href="#databricks" title="Permalink to this headline">¶</a></h2> +<p><a class="reference external" href="https://databricks.com/">Databricks</a> has contributed an Airflow operator which enables +submitting runs to the Databricks platform. Internally the operator talks to the +<code class="docutils literal"><span class="pre">api/2.0/jobs/runs/submit</span></code> <a class="reference external" href="https://docs.databricks.com/api/latest/jobs.html#runs-submit">endpoint</a>.</p> +<div class="section" id="databrickssubmitrunoperator"> +<h3>DatabricksSubmitRunOperator<a class="headerlink" href="#databrickssubmitrunoperator" title="Permalink to this headline">¶</a></h3> +<dl class="class"> +<dt id="airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.databricks_operator.</code><code class="descname">DatabricksSubmitRunOperator</code><span class="sig-paren">(</span><em>json=None</em>, <em>spark_jar_task=None</em>, <em>notebook_task=None</em>, <em>new_cluster=None</em>, <em>existing_cluster_id=None</em>, <em>libraries=None</em>, <em>run_name=None</em>, <em>timeout_seconds=None</em>, <em>databricks_conn_id='databricks_default'</em>, <em>polling_period_seconds=30</em>, <em>databricks_retry_limit=3</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/databricks_operator.html#DatabricksSubmitRunOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Submits an Spark job run to Databricks using the +<a class="reference external" href="https://docs.databricks.com/api/latest/jobs.html#runs-submit">api/2.0/jobs/runs/submit</a> +API endpoint.</p> +<p>There are two ways to instantiate this operator.</p> +<p>In the first way, you can take the JSON payload that you typically use +to call the <code class="docutils literal"><span class="pre">api/2.0/jobs/runs/submit</span></code> endpoint and pass it directly +to our <code class="docutils literal"><span class="pre">DatabricksSubmitRunOperator</span></code> through the <code class="docutils literal"><span class="pre">json</span></code> parameter. +For example</p> +<div class="highlight-default"><div class="highlight"><pre><span></span><span class="n">json</span> <span class="o">=</span> <span class="p">{</span> + <span class="s1">'new_cluster'</span><span class="p">:</span> <span class="p">{</span> + <span class="s1">'spark_version'</span><span class="p">:</span> <span class="s1">'2.1.0-db3-scala2.11'</span><span class="p">,</span> + <span class="s1">'num_workers'</span><span class="p">:</span> <span class="mi">2</span> + <span class="p">},</span> + <span class="s1">'notebook_task'</span><span class="p">:</span> <span class="p">{</span> + <span class="s1">'notebook_path'</span><span class="p">:</span> <span class="s1">'/Users/[email protected]/PrepareData'</span><span class="p">,</span> + <span class="p">},</span> +<span class="p">}</span> +<span class="n">notebook_run</span> <span class="o">=</span> <span class="n">DatabricksSubmitRunOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'notebook_run'</span><span class="p">,</span> <span class="n">json</span><span class="o">=</span><span class="n">json</span><span class="p">)</span> +</pre></div> +</div> +<p>Another way to accomplish the same thing is to use the named parameters +of the <code class="docutils literal"><span class="pre">DatabricksSubmitRunOperator</span></code> directly. Note that there is exactly +one named parameter for each top level parameter in the <code class="docutils literal"><span class="pre">runs/submit</span></code> +endpoint. In this method, your code would look like this:</p> +<div class="highlight-default"><div class="highlight"><pre><span></span><span class="n">new_cluster</span> <span class="o">=</span> <span class="p">{</span> + <span class="s1">'spark_version'</span><span class="p">:</span> <span class="s1">'2.1.0-db3-scala2.11'</span><span class="p">,</span> + <span class="s1">'num_workers'</span><span class="p">:</span> <span class="mi">2</span> +<span class="p">}</span> +<span class="n">notebook_task</span> <span class="o">=</span> <span class="p">{</span> + <span class="s1">'notebook_path'</span><span class="p">:</span> <span class="s1">'/Users/[email protected]/PrepareData'</span><span class="p">,</span> +<span class="p">}</span> +<span class="n">notebook_run</span> <span class="o">=</span> <span class="n">DatabricksSubmitRunOperator</span><span class="p">(</span> + <span class="n">task_id</span><span class="o">=</span><span class="s1">'notebook_run'</span><span class="p">,</span> + <span class="n">new_cluster</span><span class="o">=</span><span class="n">new_cluster</span><span class="p">,</span> + <span class="n">notebook_task</span><span class="o">=</span><span class="n">notebook_task</span><span class="p">)</span> +</pre></div> +</div> +<p>In the case where both the json parameter <strong>AND</strong> the named parameters +are provided, they will be merged together. If there are conflicts during the merge, +the named parameters will take precedence and override the top level <code class="docutils literal"><span class="pre">json</span></code> keys.</p> +<dl class="docutils"> +<dt>Currently the named parameters that <code class="docutils literal"><span class="pre">DatabricksSubmitRunOperator</span></code> supports are</dt> +<dd><ul class="first last simple"> +<li><code class="docutils literal"><span class="pre">spark_jar_task</span></code></li> +<li><code class="docutils literal"><span class="pre">notebook_task</span></code></li> +<li><code class="docutils literal"><span class="pre">new_cluster</span></code></li> +<li><code class="docutils literal"><span class="pre">existing_cluster_id</span></code></li> +<li><code class="docutils literal"><span class="pre">libraries</span></code></li> +<li><code class="docutils literal"><span class="pre">run_name</span></code></li> +<li><code class="docutils literal"><span class="pre">timeout_seconds</span></code></li> +</ul> +</dd> +</dl> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>json</strong> (<em>dict</em>) â <p>A JSON object containing API parameters which will be passed +directly to the <code class="docutils literal"><span class="pre">api/2.0/jobs/runs/submit</span></code> endpoint. The other named parameters +(i.e. <code class="docutils literal"><span class="pre">spark_jar_task</span></code>, <code class="docutils literal"><span class="pre">notebook_task</span></code>..) to this operator will +be merged with this json dictionary if they are provided. +If there are conflicts during the merge, the named parameters will +take precedence and override the top level json keys. This field will be +templated.</p> +<div class="admonition seealso"> +<p class="first admonition-title">See also</p> +<p class="last">For more information about templating see <a class="reference internal" href="concepts.html#jinja-templating"><span class="std std-ref">Jinja Templating</span></a>. +<a class="reference external" href="https://docs.databricks.com/api/latest/jobs.html#runs-submit">https://docs.databricks.com/api/latest/jobs.html#runs-submit</a></p> +</div> +</li> +<li><strong>spark_jar_task</strong> (<em>dict</em>) â <p>The main class and parameters for the JAR task. Note that +the actual JAR is specified in the <code class="docutils literal"><span class="pre">libraries</span></code>. +<em>EITHER</em> <code class="docutils literal"><span class="pre">spark_jar_task</span></code> <em>OR</em> <code class="docutils literal"><span class="pre">notebook_task</span></code> should be specified. +This field will be templated.</p> +<div class="admonition seealso"> +<p class="first admonition-title">See also</p> +<p class="last"><a class="reference external" href="https://docs.databricks.com/api/latest/jobs.html#jobssparkjartask">https://docs.databricks.com/api/latest/jobs.html#jobssparkjartask</a></p> +</div> +</li> +<li><strong>notebook_task</strong> (<em>dict</em>) â <p>The notebook path and parameters for the notebook task. +<em>EITHER</em> <code class="docutils literal"><span class="pre">spark_jar_task</span></code> <em>OR</em> <code class="docutils literal"><span class="pre">notebook_task</span></code> should be specified. +This field will be templated.</p> +<div class="admonition seealso"> +<p class="first admonition-title">See also</p> +<p class="last"><a class="reference external" href="https://docs.databricks.com/api/latest/jobs.html#jobsnotebooktask">https://docs.databricks.com/api/latest/jobs.html#jobsnotebooktask</a></p> +</div> +</li> +<li><strong>new_cluster</strong> (<em>dict</em>) â <p>Specs for a new cluster on which this task will be run. +<em>EITHER</em> <code class="docutils literal"><span class="pre">new_cluster</span></code> <em>OR</em> <code class="docutils literal"><span class="pre">existing_cluster_id</span></code> should be specified. +This field will be templated.</p> +<div class="admonition seealso"> +<p class="first admonition-title">See also</p> +<p class="last"><a class="reference external" href="https://docs.databricks.com/api/latest/jobs.html#jobsclusterspecnewcluster">https://docs.databricks.com/api/latest/jobs.html#jobsclusterspecnewcluster</a></p> +</div> +</li> +<li><strong>existing_cluster_id</strong> (<em>string</em>) â ID for existing cluster on which to run this task. +<em>EITHER</em> <code class="docutils literal"><span class="pre">new_cluster</span></code> <em>OR</em> <code class="docutils literal"><span class="pre">existing_cluster_id</span></code> should be specified. +This field will be templated.</li> +<li><strong>libraries</strong> (<em>list of dicts</em>) â <p>Libraries which this run will use. +This field will be templated.</p> +<div class="admonition seealso"> +<p class="first admonition-title">See also</p> +<p class="last"><a class="reference external" href="https://docs.databricks.com/api/latest/libraries.html#managedlibrarieslibrary">https://docs.databricks.com/api/latest/libraries.html#managedlibrarieslibrary</a></p> +</div> +</li> +<li><strong>run_name</strong> (<em>string</em>) â The run name used for this task. +By default this will be set to the Airflow <code class="docutils literal"><span class="pre">task_id</span></code>. This <code class="docutils literal"><span class="pre">task_id</span></code> is a +required parameter of the superclass <code class="docutils literal"><span class="pre">BaseOperator</span></code>. +This field will be templated.</li> +<li><strong>timeout_seconds</strong> (<em>int32</em>) â The timeout for this run. By default a value of 0 is used +which means to have no timeout. +This field will be templated.</li> +<li><strong>databricks_conn_id</strong> (<em>string</em>) â The name of the Airflow connection to use. +By default and in the common case this will be <code class="docutils literal"><span class="pre">databricks_default</span></code>. To use +token based authentication, provide the key <code class="docutils literal"><span class="pre">token</span></code> in the extra field for the +connection.</li> +<li><strong>polling_period_seconds</strong> (<em>int</em>) â Controls the rate which we poll for the result of +this run. By default the operator will poll every 30 seconds.</li> +<li><strong>databricks_retry_limit</strong> (<em>int</em>) â Amount of times retry if the Databricks backend is +unreachable. Its value must be greater than or equal to 1.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +</div> </div> <div class="section" id="gcp-google-cloud-platform"> <span id="gcp"></span><h2>GCP: Google Cloud Platform<a class="headerlink" href="#gcp-google-cloud-platform" title="Permalink to this headline">¶</a></h2> <p>Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a <em>beta</em> status, meaning that they can have breaking changes between minor releases.</p> +<div class="section" id="logging"> +<h3>Logging<a class="headerlink" href="#logging" title="Permalink to this headline">¶</a></h3> +<p>Airflow can be configured to read and write task logs in Google cloud storage. +Follow the steps below to enable Google cloud storage logging.</p> +<ol class="arabic"> +<li><p class="first">Airflowâs logging system requires a custom .py file to be located in the <code class="docutils literal"><span class="pre">PYTHONPATH</span></code>, so that itâs importable from Airflow. Start by creating a directory to store the config file. <code class="docutils literal"><span class="pre">$AIRFLOW_HOME/config</span></code> is recommended.</p> +</li> +<li><p class="first">Create empty files called <code class="docutils literal"><span class="pre">$AIRFLOW_HOME/config/log_config.py</span></code> and <code class="docutils literal"><span class="pre">$AIRFLOW_HOME/config/__init__.py</span></code>.</p> +</li> +<li><p class="first">Copy the contents of <code class="docutils literal"><span class="pre">airflow/config_templates/airflow_local_settings.py</span></code> into the <code class="docutils literal"><span class="pre">log_config.py</span></code> file that was just created in the step above.</p> +</li> +<li><p class="first">Customize the following portions of the template:</p> +<blockquote> +<div><div class="highlight-bash"><div class="highlight"><pre><span></span><span class="c1"># Add this variable to the top of the file. Note the trailing slash.</span> +<span class="nv">GCS_LOG_FOLDER</span> <span class="o">=</span> <span class="s1">'gs://<bucket where logs should be persisted>/'</span> + +<span class="c1"># Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG</span> +<span class="nv">LOGGING_CONFIG</span> <span class="o">=</span> ... + +<span class="c1"># Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable</span> +<span class="s1">'gcs.task'</span>: <span class="o">{</span> + <span class="s1">'class'</span>: <span class="s1">'airflow.utils.log.gcs_task_handler.GCSTaskHandler'</span>, + <span class="s1">'formatter'</span>: <span class="s1">'airflow.task'</span>, + <span class="s1">'base_log_folder'</span>: os.path.expanduser<span class="o">(</span>BASE_LOG_FOLDER<span class="o">)</span>, + <span class="s1">'gcs_log_folder'</span>: GCS_LOG_FOLDER, + <span class="s1">'filename_template'</span>: FILENAME_TEMPLATE, +<span class="o">}</span>, + +<span class="c1"># Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'.</span> +<span class="s1">'loggers'</span>: <span class="o">{</span> + <span class="s1">'airflow.task'</span>: <span class="o">{</span> + <span class="s1">'handlers'</span>: <span class="o">[</span><span class="s1">'gcs.task'</span><span class="o">]</span>, + ... + <span class="o">}</span>, + <span class="s1">'airflow.task_runner'</span>: <span class="o">{</span> + <span class="s1">'handlers'</span>: <span class="o">[</span><span class="s1">'gcs.task'</span><span class="o">]</span>, + ... + <span class="o">}</span>, + <span class="s1">'airflow'</span>: <span class="o">{</span> + <span class="s1">'handlers'</span>: <span class="o">[</span><span class="s1">'console'</span><span class="o">]</span>, + ... + <span class="o">}</span>, +<span class="o">}</span> +</pre></div> +</div> +</div></blockquote> +</li> +<li><p class="first">Make sure a Google cloud platform connection hook has been defined in Airflow. The hook should have read and write access to the Google cloud storage bucket defined above in <code class="docutils literal"><span class="pre">GCS_LOG_FOLDER</span></code>.</p> +</li> +<li><p class="first">Update <code class="docutils literal"><span class="pre">$AIRFLOW_HOME/airflow.cfg</span></code> to contain:</p> +<blockquote> +<div><div class="highlight-bash"><div class="highlight"><pre><span></span><span class="nv">task_log_reader</span> <span class="o">=</span> gcs.task +<span class="nv">logging_config_class</span> <span class="o">=</span> log_config.LOGGING_CONFIG +<span class="nv">remote_log_conn_id</span> <span class="o">=</span> <name of the Google cloud platform hook> +</pre></div> +</div> +</div></blockquote> +</li> +<li><p class="first">Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.</p> +</li> +<li><p class="first">Verify that logs are showing up for newly executed tasks in the bucket youâve defined.</p> +</li> +<li><p class="first">Verify that the Google cloud storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:</p> +<blockquote> +<div><div class="highlight-bash"><div class="highlight"><pre><span></span>*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. +<span class="o">[</span><span class="m">2017</span>-10-03 <span class="m">21</span>:57:50,056<span class="o">]</span> <span class="o">{</span>cli.py:377<span class="o">}</span> INFO - Running on host chrisr-00532 +<span class="o">[</span><span class="m">2017</span>-10-03 <span class="m">21</span>:57:50,093<span class="o">]</span> <span class="o">{</span>base_task_runner.py:115<span class="o">}</span> INFO - Running: <span class="o">[</span><span class="s1">'bash'</span>, <span class="s1">'-c'</span>, u<span class="s1">'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'</span><span class="o">]</span> +<span class="o">[</span><span class="m">2017</span>-10-03 <span class="m">21</span>:57:51,264<span class="o">]</span> <span class="o">{</span>base_task_runner.py:98<span class="o">}</span> INFO - Subtask: <span class="o">[</span><span class="m">2017</span>-10-03 <span class="m">21</span>:57:51,263<span class="o">]</span> <span class="o">{</span>__init__.py:45<span class="o">}</span> INFO - Using executor SequentialExecutor +<span class="o">[</span><span class="m">2017</span>-10-03 <span class="m">21</span>:57:51,306<span class="o">]</span> <span class="o">{</span>base_task_runner.py:98<span class="o">}</span> INFO - Subtask: <span class="o">[</span><span class="m">2017</span>-10-03 <span class="m">21</span>:57:51,306<span class="o">]</span> <span class="o">{</span>models.py:186<span class="o">}</span> INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py +</pre></div> +</div> +</div></blockquote> +</li> +</ol> +<p>Note the top line that says itâs reading from the remote log file.</p> +<p>Please be aware that if you were persisting logs to Google cloud storage using the old-style airflow.cfg configuration method, the old logs will no longer be visible in the Airflow UI, though theyâll still exist in Google cloud storage. This is a backwards incompatbile change. If you are unhappy with it, you can change the <code class="docutils literal"><span class="pre">FILENAME_TEMPLATE</span></code> to reflect the old-style log filename format.</p> +</div> <div class="section" id="bigquery"> <h3>BigQuery<a class="headerlink" href="#bigquery" title="Permalink to this headline">¶</a></h3> <div class="section" id="bigquery-operators"> @@ -220,26 +850,287 @@ they can have breaking changes between minor releases.</p> <li><a class="reference internal" href="#bigquerytocloudstorageoperator"><span class="std std-ref">BigQueryToCloudStorageOperator</span></a> : Transfers a BigQuery table to a Google Cloud Storage bucket</li> </ul> <div class="section" id="bigquerycheckoperator"> -<span id="id1"></span><h5>BigQueryCheckOperator<a class="headerlink" href="#bigquerycheckoperator" title="Permalink to this headline">¶</a></h5> +<span id="id16"></span><h5>BigQueryCheckOperator<a class="headerlink" href="#bigquerycheckoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.bigquery_check_operator.</code><code class="descname">BigQueryCheckOperator</code><span class="sig-paren">(</span><em>sql</em>, <em>bigquery_conn_id='bigquery_default'</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/bigquery_check_operator.html#BigQueryCheckOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Performs checks against BigQuery. The <code class="docutils literal"><span class="pre">BigQueryCheckOperator</span></code> expects +a sql query that will return a single row. Each value on that +first row is evaluated using python <code class="docutils literal"><span class="pre">bool</span></code> casting. If any of the +values return <code class="docutils literal"><span class="pre">False</span></code> the check is failed and errors out.</p> +<p>Note that Python bool casting evals the following as <code class="docutils literal"><span class="pre">False</span></code>:</p> +<ul class="simple"> +<li><code class="docutils literal"><span class="pre">False</span></code></li> +<li><code class="docutils literal"><span class="pre">0</span></code></li> +<li>Empty string (<code class="docutils literal"><span class="pre">""</span></code>)</li> +<li>Empty list (<code class="docutils literal"><span class="pre">[]</span></code>)</li> +<li>Empty dictionary or set (<code class="docutils literal"><span class="pre">{}</span></code>)</li> +</ul> +<p>Given a query like <code class="docutils literal"><span class="pre">SELECT</span> <span class="pre">COUNT(*)</span> <span class="pre">FROM</span> <span class="pre">foo</span></code>, it will fail only if +the count <code class="docutils literal"><span class="pre">==</span> <span class="pre">0</span></code>. You can craft much more complex query that could, +for instance, check that the table has the same number of rows as +the source table upstream, or that the count of todayâs partition is +greater than yesterdayâs partition, or that a set of metrics are less +than 3 standard deviation for the 7 day average.</p> +<p>This operator can be used as a data quality check in your pipeline, and +depending on where you put it in your DAG, you have the choice to +stop the critical path, preventing from +publishing dubious data, or on the side and receive email alterts +without stopping the progress of the DAG.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>sql</strong> (<em>string</em>) â the sql to be executed</li> +<li><strong>bigquery_conn_id</strong> (<em>string</em>) â reference to the BigQuery database</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + </div> <div class="section" id="bigqueryvaluecheckoperator"> -<span id="id2"></span><h5>BigQueryValueCheckOperator<a class="headerlink" href="#bigqueryvaluecheckoperator" title="Permalink to this headline">¶</a></h5> +<span id="id17"></span><h5>BigQueryValueCheckOperator<a class="headerlink" href="#bigqueryvaluecheckoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.bigquery_check_operator.</code><code class="descname">BigQueryValueCheckOperator</code><span class="sig-paren">(</span><em>sql</em>, <em>pass_value</em>, <em>tolerance=None</em>, <em>bigquery_conn_id='bigquery_default'</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/bigquery_check_operator.html#BigQueryValueCheckOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Performs a simple value check using sql code.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><strong>sql</strong> (<em>string</em>) â the sql to be executed</td> +</tr> +</tbody> +</table> +</dd></dl> + </div> <div class="section" id="bigqueryintervalcheckoperator"> -<span id="id3"></span><h5>BigQueryIntervalCheckOperator<a class="headerlink" href="#bigqueryintervalcheckoperator" title="Permalink to this headline">¶</a></h5> +<span id="id18"></span><h5>BigQueryIntervalCheckOperator<a class="headerlink" href="#bigqueryintervalcheckoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.bigquery_check_operator.</code><code class="descname">BigQueryIntervalCheckOperator</code><span class="sig-paren">(</span><em>table</em>, <em>metrics_thresholds</em>, <em>date_filter_column='ds'</em>, <em>days_back=-7</em>, <em>bigquery_conn_id='bigquery_default'</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/bigquery_check_operator.html#BigQueryIntervalCheckOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Checks that the values of metrics given as SQL expressions are within +a certain tolerance of the ones from days_back before.</p> +<p>This method constructs a query like so:</p> +<dl class="docutils"> +<dt>SELECT {metrics_threshold_dict_key} FROM {table}</dt> +<dd>WHERE {date_filter_column}=<date></dd> +</dl> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>table</strong> (<em>str</em>) â the table name</li> +<li><strong>days_back</strong> (<em>int</em>) â number of days between ds and the ds we want to check +against. Defaults to 7 days</li> +<li><strong>metrics_threshold</strong> (<em>dict</em>) â a dictionary of ratios indexed by metrics, for +example âCOUNT(*)â: 1.5 would require a 50 percent or less difference +between the current day, and the prior days_back.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + </div> <div class="section" id="bigqueryoperator"> -<span id="id4"></span><h5>BigQueryOperator<a class="headerlink" href="#bigqueryoperator" title="Permalink to this headline">¶</a></h5> +<span id="id19"></span><h5>BigQueryOperator<a class="headerlink" href="#bigqueryoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.bigquery_operator.BigQueryOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.bigquery_operator.</code><code class="descname">BigQueryOperator</code><span class="sig-paren">(</span><em>bql</em>, <em>destination_dataset_table=False</em>, <em>write_disposition='WRITE_EMPTY'</em>, <em>allow_large_results=False</em>, <em>bigquery_conn_id='bigquery_default'</em>, <em>delegate_to=None</em>, <em>udf_config=False</em>, <em>use_legacy_sql=True</em>, <em>maximum_billing_tier=None</em>, <em>create_disposition='CREATE_IF_NEEDED'</em>, <em>query_params=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/bigquery_operator.html#BigQueryOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.bigquery_operator.BigQueryOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Executes BigQuery SQL queries in a specific BigQuery database</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>bql</strong> (<em>Can receive a str representing a sql statement</em><em>, +</em><em>a list of str</em><em> (</em><em>sql statements</em><em>)</em><em>, or </em><em>reference to a template file. +Template reference are recognized by str ending in '.sql'</em>) â the sql code to be executed</li> +<li><strong>destination_dataset_table</strong> (<em>string</em>) â A dotted +(<project>.|<project>:)<dataset>.<table> that, if set, will store the results +of the query.</li> +<li><strong>write_disposition</strong> (<em>string</em>) â Specifies the action that occurs if the destination table +already exists. (default: âWRITE_EMPTYâ)</li> +<li><strong>create_disposition</strong> (<em>string</em>) â Specifies whether the job is allowed to create new tables. +(default: âCREATE_IF_NEEDEDâ)</li> +<li><strong>bigquery_conn_id</strong> (<em>string</em>) â reference to a specific BigQuery hook.</li> +<li><strong>delegate_to</strong> (<em>string</em>) â The account to impersonate, if any. +For this to work, the service account making the request must have domain-wide +delegation enabled.</li> +<li><strong>udf_config</strong> (<a class="reference internal" href="#airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook.list" title="airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook.list"><em>list</em></a>) â The User Defined Function configuration for the query. +See <a class="reference external" href="https://cloud.google.com/bigquery/user-defined-functions">https://cloud.google.com/bigquery/user-defined-functions</a> for details.</li> +<li><strong>use_legacy_sql</strong> (<em>boolean</em>) â Whether to use legacy SQL (true) or standard SQL (false).</li> +<li><strong>maximum_billing_tier</strong> (<em>integer</em>) â Positive integer that serves as a multiplier of the basic price. +Defaults to None, in which case it uses the value set in the project.</li> +<li><strong>query_params</strong> (<em>dict</em>) â a dictionary containing query parameter types and values, passed to +BigQuery.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + </div> <div class="section" id="bigquerytobigqueryoperator"> -<span id="id5"></span><h5>BigQueryToBigQueryOperator<a class="headerlink" href="#bigquerytobigqueryoperator" title="Permalink to this headline">¶</a></h5> +<span id="id20"></span><h5>BigQueryToBigQueryOperator<a class="headerlink" href="#bigquerytobigqueryoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.bigquery_to_bigquery.</code><code class="descname">BigQueryToBigQueryOperator</code><span class="sig-paren">(</span><em>source_project_dataset_tables</em>, <em>destination_project_dataset_table</em>, <em>write_disposition='WRITE_EMPTY'</em>, <em>create_disposition='CREATE_IF_NEEDED'</em>, <em>bigquery_conn_id='bigquery_default'</em>, <em>delegate_to=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/bigquery_to_bigquery.html#BigQueryToBigQueryOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Copies data from one BigQuery table to another. See here:</p> +<p><a class="reference external" href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy">https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy</a></p> +<p>For more details about these parameters.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>source_project_dataset_tables</strong> (<em>list|string</em>) â One or more +dotted (project:<a href="#id21"><span class="problematic" id="id22">|</span></a>project.)<dataset>.<table> BigQuery tables to use as the +source data. If <project> is not included, project will be the project defined +in the connection json. Use a list if there are multiple source tables.</li> +<li><strong>destination_project_dataset_table</strong> (<em>string</em>) â The destination BigQuery +table. Format is: (project:<a href="#id23"><span class="problematic" id="id24">|</span></a>project.)<dataset>.<table></li> +<li><strong>write_disposition</strong> (<em>string</em>) â The write disposition if the table already exists.</li> +<li><strong>create_disposition</strong> (<em>string</em>) â The create disposition if the table doesnât exist.</li> +<li><strong>bigquery_conn_id</strong> (<em>string</em>) â reference to a specific BigQuery hook.</li> +<li><strong>delegate_to</strong> (<em>string</em>) â The account to impersonate, if any. +For this to work, the service account making the request must have domain-wide +delegation enabled.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + </div> <div class="section" id="bigquerytocloudstorageoperator"> -<span id="id6"></span><h5>BigQueryToCloudStorageOperator<a class="headerlink" href="#bigquerytocloudstorageoperator" title="Permalink to this headline">¶</a></h5> +<span id="id25"></span><h5>BigQueryToCloudStorageOperator<a class="headerlink" href="#bigquerytocloudstorageoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.bigquery_to_gcs.</code><code class="descname">BigQueryToCloudStorageOperator</code><span class="sig-paren">(</span><em>source_project_dataset_table</em>, <em>destination_cloud_storage_uris</em>, <em>compression='NONE'</em>, <em>export_format='CSV'</em>, <em>field_delimiter='</em>, <em>'</em>, <em>print_header=True</em>, <em>bigquery_conn_id='bigquery_default'</em>, <em>delegate_to=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/bigquery_to_gcs.html#BigQueryToCloudStorageOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Transfers a BigQuery table to a Google Cloud Storage bucket.</p> +<p>See here:</p> +<p><a class="reference external" href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">https://cloud.google.com/bigquery/docs/reference/v2/jobs</a></p> +<p>For more details about these parameters.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>source_project_dataset_table</strong> (<em>string</em>) â The dotted +(<project>.|<project>:)<dataset>.<table> BigQuery table to use as the source +data. If <project> is not included, project will be the project defined in +the connection json.</li> +<li><strong>destination_cloud_storage_uris</strong> (<a class="reference internal" href="#airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook.list" title="airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook.list"><em>list</em></a>) â The destination Google Cloud +Storage URI (e.g. gs://some-bucket/some-file.txt). Follows +convention defined here: +https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple</li> +<li><strong>compression</strong> (<em>string</em>) â Type of compression to use.</li> +<li><strong>export_format</strong> â File format to export.</li> +<li><strong>field_delimiter</strong> (<em>string</em>) â The delimiter to use when extracting to a CSV.</li> +<li><strong>print_header</strong> (<em>boolean</em>) â Whether to print a header for a CSV file extract.</li> +<li><strong>bigquery_conn_id</strong> (<em>string</em>) â reference to a specific BigQuery hook.</li> +<li><strong>delegate_to</strong> (<em>string</em>) â The account to impersonate, if any. +For this to work, the service account making the request must have domain-wide +delegation enabled.</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + </div> </div> <div class="section" id="bigqueryhook"> <h4>BigQueryHook<a class="headerlink" href="#bigqueryhook" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.hooks.bigquery_hook.BigQueryHook"> +<em class="property">class </em><code class="descclassname">airflow.contrib.hooks.bigquery_hook.</code><code class="descname">BigQueryHook</code><span class="sig-paren">(</span><em>bigquery_conn_id='bigquery_default'</em>, <em>delegate_to=None</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/bigquery_hook.html#BigQueryHook"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.bigquery_hook.BigQueryHook" title="Permalink to this definition">¶</a></dt> +<dd><p>Interact with BigQuery. This hook uses the Google Cloud Platform +connection.</p> +<dl class="method"> +<dt id="airflow.contrib.hooks.bigquery_hook.BigQueryHook.get_conn"> +<code class="descname">get_conn</code><span class="sig-paren">(</span><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/bigquery_hook.html#BigQueryHook.get_conn"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.bigquery_hook.BigQueryHook.get_conn" title="Permalink to this definition">¶</a></dt> +<dd><p>Returns a BigQuery PEP 249 connection object.</p> +</dd></dl> + +<dl class="method"> +<dt id="airflow.contrib.hooks.bigquery_hook.BigQueryHook.get_pandas_df"> +<code class="descname">get_pandas_df</code><span class="sig-paren">(</span><em>bql</em>, <em>parameters=None</em>, <em>dialect='legacy'</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/bigquery_hook.html#BigQueryHook.get_pandas_df"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.bigquery_hook.BigQueryHook.get_pandas_df" title="Permalink to this definition">¶</a></dt> +<dd><p>Returns a Pandas DataFrame for the results produced by a BigQuery +query. The DbApiHook method must be overridden because Pandas +doesnât support PEP 249 connections, except for SQLite. See:</p> +<p><a class="reference external" href="https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447">https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447</a> +<a class="reference external" href="https://github.com/pydata/pandas/issues/6900">https://github.com/pydata/pandas/issues/6900</a></p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><ul class="first last simple"> +<li><strong>bql</strong> (<em>string</em>) â The BigQuery SQL to execute.</li> +<li><strong>parameters</strong> (<em>mapping</em><em> or </em><em>iterable</em>) â The parameters to render the SQL query with (not used, leave to override superclass method)</li> +<li><strong>dialect</strong> (<em>string in {'legacy'</em><em>, </em><em>'standard'}</em><em>, </em><em>default 'legacy'</em>) â Dialect of BigQuery SQL â legacy SQL or standard SQL</li> +</ul> +</td> +</tr> +</tbody> +</table> +</dd></dl> + +<dl class="method"> +<dt id="airflow.contrib.hooks.bigquery_hook.BigQueryHook.get_service"> +<code class="descname">get_service</code><span class="sig-paren">(</span><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/bigquery_hook.html#BigQueryHook.get_service"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.bigquery_hook.BigQueryHook.get_service" title="Permalink to this definition">¶</a></dt> +<dd><p>Returns a BigQuery service object.</p> +</dd></dl> + +<dl class="method"> +<dt id="airflow.contrib.hooks.bigquery_hook.BigQueryHook.insert_rows"> +<code class="descname">insert_rows</code><span class="sig-paren">(</span><em>table</em>, <em>rows</em>, <em>target_fields=None</em>, <em>commit_every=1000</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/bigquery_hook.html#BigQueryHook.insert_rows"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.bigquery_hook.BigQueryHook.insert_rows" title="Permalink to this definition">¶</a></dt> +<dd><p>Insertion is currently unsupported. Theoretically, you could use +BigQueryâs streaming API to insert rows into a table, but this hasnât +been implemented.</p> +</dd></dl> + +<dl class="method"> +<dt id="airflow.contrib.hooks.bigquery_hook.BigQueryHook.table_exists"> +<code class="descname">table_exists</code><span class="sig-paren">(</span><em>project_id</em>, <em>dataset_id</em>, <em>table_id</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/bigquery_hook.html#BigQueryHook.table_exists"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.bigquery_hook.BigQueryHook.table_exists" title="Permalink to this definition">¶</a></dt> +<dd><p>Checks for the existence of a table in Google BigQuery.</p> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><strong>project_id</strong> â The Google cloud project in which to look for the table. The connection supplied to the hook</td> +</tr> +</tbody> +</table> +<p>must provide access to the specified project. +:type project_id: string +:param dataset_id: The name of the dataset in which to look for the table.</p> +<blockquote> +<div>storage bucket.</div></blockquote> +<table class="docutils field-list" frame="void" rules="none"> +<col class="field-name" /> +<col class="field-body" /> +<tbody valign="top"> +<tr class="field-odd field"><th class="field-name">Parameters:</th><td class="field-body"><strong>table_id</strong> (<em>string</em>) â The name of the table to check the existence of.</td> +</tr> +</tbody> +</table> +</dd></dl> + +</dd></dl> + </div> </div> <div class="section" id="cloud-dataflow"> @@ -247,10 +1138,50 @@ they can have breaking changes between minor releases.</p> <div class="section" id="dataflow-operators"> <h4>DataFlow Operators<a class="headerlink" href="#dataflow-operators" title="Permalink to this headline">¶</a></h4> <ul class="simple"> -<li><a class="reference internal" href="#dataflowjavaoperator"><span class="std std-ref">DataFlowJavaOperator</span></a> :</li> +<li><a class="reference internal" href="#dataflowjavaoperator"><span class="std std-ref">DataFlowJavaOperator</span></a> : launching Cloud Dataflow jobs written in Java.</li> +<li><a class="reference internal" href="#dataflowpythonoperator"><span class="std std-ref">DataFlowPythonOperator</span></a> : launching Cloud Dataflow jobs written in python.</li> </ul> <div class="section" id="dataflowjavaoperator"> -<span id="id7"></span><h5>DataFlowJavaOperator<a class="headerlink" href="#dataflowjavaoperator" title="Permalink to this headline">¶</a></h5> +<span id="id26"></span><h5>DataFlowJavaOperator<a class="headerlink" href="#dataflowjavaoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.dataflow_operator.</code><code class="descname">DataFlowJavaOperator</code><span class="sig-paren">(</span><em>jar</em>, <em>dataflow_default_options=None</em>, <em>options=None</em>, <em>gcp_conn_id='google_cloud_default'</em>, <em>delegate_to=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowJavaOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Start a Java Cloud DataFlow batch job. The parameters of the operation +will be passed to the job.</p> +<p>Itâs a good practice to define dataflow_* parameters in the default_args of the dag +like the project, zone and staging location.</p> +<p><a href="#id27"><span class="problematic" id="id28">``</span></a>` +default_args = {</p> +<blockquote> +<div><dl class="docutils"> +<dt>âdataflow_default_optionsâ: {</dt> +<dd>âprojectâ: âmy-gcp-projectâ, +âzoneâ: âeurope-west1-dâ, +âstagingLocationâ: âgs://my-staging-bucket/staging/â</dd> +</dl> +<p>}</p> +</div></blockquote> +<p>You need to pass the path to your dataflow as a file reference with the <code class="docutils literal"><span class="pre">jar</span></code> +parameter, the jar needs to be a self executing jar. Use <code class="docutils literal"><span class="pre">options</span></code> to pass on +options to your job.</p> +<p><a href="#id29"><span class="problematic" id="id30">``</span></a>` +t1 = DataFlowOperation(</p> +<blockquote> +<div><p>task_id=âdatapflow_exampleâ, +jar=â{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jarâ, +options={</p> +<blockquote> +<div>âautoscalingAlgorithmâ: âBASICâ, +âmaxNumWorkersâ: â50â, +âstartâ: â{{ds}}â, +âpartitionTypeâ: âDAYâ</div></blockquote> +<p>}, +dag=my-dag)</p> +</div></blockquote> +<p><a href="#id31"><span class="problematic" id="id32">``</span></a><a href="#id33"><span class="problematic" id="id34">`</span></a></p> +<p>Both <code class="docutils literal"><span class="pre">jar</span></code> and <code class="docutils literal"><span class="pre">options</span></code> are templated so you can use variables in them.</p> +</dd></dl> + <div class="code python highlight-default"><div class="highlight"><pre><span></span><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span> <span class="s1">'owner'</span><span class="p">:</span> <span class="s1">'airflow'</span><span class="p">,</span> <span class="s1">'depends_on_past'</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span> @@ -285,9 +1216,28 @@ they can have breaking changes between minor releases.</p> </pre></div> </div> </div> +<div class="section" id="dataflowpythonoperator"> +<span id="id35"></span><h5>DataFlowPythonOperator<a class="headerlink" href="#dataflowpythonoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.dataflow_operator.</code><code class="descname">DataFlowPythonOperator</code><span class="sig-paren">(</span><em>py_file</em>, <em>py_options=None</em>, <em>dataflow_default_options=None</em>, <em>options=None</em>, <em>gcp_conn_id='google_cloud_default'</em>, <em>delegate_to=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/dataflow_operator.html#DataFlowPythonOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator" title="Permalink to this definition">¶</a></dt> +<dd></dd></dl> + +</div> </div> <div class="section" id="dataflowhook"> <h4>DataFlowHook<a class="headerlink" href="#dataflowhook" title="Permalink to this headline">¶</a></h4> +<dl class="class"> +<dt id="airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook"> +<em class="property">class </em><code class="descclassname">airflow.contrib.hooks.gcp_dataflow_hook.</code><code class="descname">DataFlowHook</code><span class="sig-paren">(</span><em>gcp_conn_id='google_cloud_default'</em>, <em>delegate_to=None</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/gcp_dataflow_hook.html#DataFlowHook"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook" title="Permalink to this definition">¶</a></dt> +<dd><dl class="method"> +<dt id="airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook.get_conn"> +<code class="descname">get_conn</code><span class="sig-paren">(</span><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/hooks/gcp_dataflow_hook.html#DataFlowHook.get_conn"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook.get_conn" title="Permalink to this definition">¶</a></dt> +<dd><p>Returns a Google Cloud Storage service object.</p> +</dd></dl> + +</dd></dl> + </div> </div> <div class="section" id="cloud-dataproc"> @@ -303,29 +1253,566 @@ they can have breaking changes between minor releases.</p> <li><a class="reference internal" href="#dataprocpysparkoperator"><span class="std std-ref">DataProcPySparkOperator</span></a> : Start a PySpark Job on a Cloud DataProc cluster.</li> </ul> <div class="section" id="dataprocpigoperator"> -<span id="id8"></span><h5>DataProcPigOperator<a class="headerlink" href="#dataprocpigoperator" title="Permalink to this headline">¶</a></h5> +<span id="id36"></span><h5>DataProcPigOperator<a class="headerlink" href="#dataprocpigoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.dataproc_operator.DataProcPigOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.dataproc_operator.</code><code class="descname">DataProcPigOperator</code><span class="sig-paren">(</span><em>query=None</em>, <em>query_uri=None</em>, <em>variables=None</em>, <em>job_name='{{task.task_id}}_{{ds_nodash}}'</em>, <em>cluster_name='cluster-1'</em>, <em>dataproc_pig_properties=None</em>, <em>dataproc_pig_jars=None</em>, <em>gcp_conn_id='google_cloud_default'</em>, <em>delegate_to=None</em>, <em>*args</em>, <em>**kwargs</em><span class="sig-paren">)</span><a class="reference internal" href="_modules/airflow/contrib/operators/dataproc_operator.html#DataProcPigOperator"><span class="viewcode-link">[source]</span></a><a class="headerlink" href="#airflow.contrib.operators.dataproc_operator.DataProcPigOperator" title="Permalink to this definition">¶</a></dt> +<dd><p>Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation +will be passed to the cluster.</p> +<p>Itâs a good practice to define dataproc_* parameters in the default_args of the dag +like the cluster name and UDFs.</p> +<p><a href="#id37"><span class="problematic" id="id38">``</span></a>` +default_args = {</p> +<blockquote> +<div><p>âcluster_nameâ: âcluster-1â, +âdataproc_pig_jarsâ: [</p> +<blockquote> +<div>âgs://example/udf/jar/datafu/1.2.0/datafu.jarâ, +âgs://example/udf/jar/gpig/1.2/gpig.jarâ</div></blockquote> +<p>]</p> +</div></blockquote> +<p>You can pass a pig script as string or file reference. Use variables to pass on +variables for the pig script to be resolved on the cluster or use the parameters to +be resolved in the script as template parameters.</p> +<p><a href="#id39"><span class="problematic" id="id40">``</span></a>` +t1 = DataProcPigOperator(</p> +<blockquote> +<div>task_id=âdataproc_pigâ, +query=âa_pig_script.pigâ, +variables={âoutâ: âgs://example/output/{{ds}}â},</div></blockquote> +<p>dag=dag) +<a href="#id41"><span class="problematic" id="id42">``</span></a><a href="#id43"><span class="problematic" id="id44">`</span></a></p> +</dd></dl> + </div> <div class="section" id="dataprochiveoperator"> -<span id="id9"></span><h5>DataProcHiveOperator<a class="headerlink" href="#dataprochiveoperator" title="Permalink to this headline">¶</a></h5> +<span id="id45"></span><h5>DataProcHiveOperator<a class="headerlink" href="#dataprochiveoperator" title="Permalink to this headline">¶</a></h5> +<dl class="class"> +<dt id="airflow.contrib.operators.dataproc_operator.DataProcHiveOperator"> +<em class="property">class </em><code class="descclassname">airflow.contrib.operators.dataproc_operator.</code><code class="descname">DataProcHiveOperator</code><span class="sig-paren">(</span><em>query=None</em>, <em>query_uri=None</em>, <em>
<TRUNCATED>
