http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/python_operator.html ---------------------------------------------------------------------- diff --git a/_modules/python_operator.html b/_modules/python_operator.html index d2def55..5e5cdc6 100644 --- a/_modules/python_operator.html +++ b/_modules/python_operator.html @@ -13,6 +13,8 @@ + + @@ -80,7 +82,10 @@ - <ul> + + + + <ul> <li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li> <li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li> <li class="toctree-l1"><a class="reference internal" href="../start.html">Quick Start</a></li> @@ -177,13 +182,20 @@ <span class="c1"># limitations under the License.</span> <span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">str</span> -<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">datetime</span> -<span class="kn">import</span> <span class="nn">logging</span> - -<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">TaskInstance</span> -<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span> +<span class="kn">import</span> <span class="nn">dill</span> +<span class="kn">import</span> <span class="nn">inspect</span> +<span class="kn">import</span> <span class="nn">os</span> +<span class="kn">import</span> <span class="nn">pickle</span> +<span class="kn">import</span> <span class="nn">subprocess</span> +<span class="kn">import</span> <span class="nn">sys</span> +<span class="kn">import</span> <span class="nn">types</span> + +<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span> +<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">SkipMixin</span> <span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span> -<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span> +<span class="kn">from</span> <span class="nn">airflow.utils.file</span> <span class="k">import</span> <span class="n">TemporaryDirectory</span> + +<span class="kn">from</span> <span class="nn">textwrap</span> <span class="k">import</span> <span class="n">dedent</span> <div class="viewcode-block" id="PythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.PythonOperator">[docs]</a><span class="k">class</span> <span class="nc">PythonOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span> @@ -226,7 +238,9 @@ <span class="n">templates_dict</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">templates_exts</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> + <span class="nb">super</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> + <span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">python_callable</span><span class="p">):</span> + <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'`python_callable` param must be callable'</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span> <span class="o">=</span> <span class="n">python_callable</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_args</span> <span class="o">=</span> <span class="n">op_args</span> <span class="ow">or</span> <span class="p">[]</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span> <span class="o">=</span> <span class="n">op_kwargs</span> <span class="ow">or</span> <span class="p">{}</span> @@ -241,12 +255,15 @@ <span class="n">context</span><span class="p">[</span><span class="s1">'templates_dict'</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">templates_dict</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span> <span class="o">=</span> <span class="n">context</span> - <span class="n">return_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done. Returned value was: "</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">return_value</span><span class="p">))</span> - <span class="k">return</span> <span class="n">return_value</span></div> + <span class="n">return_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execute_callable</span><span class="p">()</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done. Returned value was: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">return_value</span><span class="p">)</span> + <span class="k">return</span> <span class="n">return_value</span> + + <span class="k">def</span> <span class="nf">execute_callable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span></div> -<div class="viewcode-block" id="BranchPythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.BranchPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">BranchPythonOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span> +<div class="viewcode-block" id="BranchPythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.BranchPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">BranchPythonOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="n">SkipMixin</span><span class="p">):</span> <span class="sd">"""</span> <span class="sd"> Allows a workflow to "branch" or follow a single path following the</span> <span class="sd"> execution of this task.</span> @@ -267,23 +284,20 @@ <span class="sd"> """</span> <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="n">branch</span> <span class="o">=</span> <span class="nb">super</span><span class="p">(</span><span class="n">BranchPythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Following branch "</span> <span class="o">+</span> <span class="n">branch</span><span class="p">)</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Marking other directly downstream tasks as skipped"</span><span class="p">)</span> - <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> - <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">:</span> - <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span> <span class="o">!=</span> <span class="n">branch</span><span class="p">:</span> - <span class="n">ti</span> <span class="o">=</span> <span class="n">TaskInstance</span><span class="p">(</span> - <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> - <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span> - <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> - <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> - <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div> - - -<div class="viewcode-block" id="ShortCircuitOperator"><a class="viewcode-back" href="../code.html#airflow.operators.ShortCircuitOperator">[docs]</a><span class="k">class</span> <span class="nc">ShortCircuitOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Following branch </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">branch</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Marking other directly downstream tasks as skipped"</span><span class="p">)</span> + + <span class="n">downstream_tasks</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Downstream task_ids </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span> + + <span class="n">skip_tasks</span> <span class="o">=</span> <span class="p">[</span><span class="n">t</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">downstream_tasks</span> <span class="k">if</span> <span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="o">!=</span> <span class="n">branch</span><span class="p">]</span> + <span class="k">if</span> <span class="n">downstream_tasks</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">skip</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'dag_run'</span><span class="p">],</span> <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">skip_tasks</span><span class="p">)</span> + + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div> + + +<div class="viewcode-block" id="ShortCircuitOperator"><a class="viewcode-back" href="../code.html#airflow.operators.ShortCircuitOperator">[docs]</a><span class="k">class</span> <span class="nc">ShortCircuitOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="n">SkipMixin</span><span class="p">):</span> <span class="sd">"""</span> <span class="sd"> Allows a workflow to continue only if a condition is met. Otherwise, the</span> <span class="sd"> workflow "short-circuits" and downstream tasks are skipped.</span> @@ -297,23 +311,220 @@ <span class="sd"> """</span> <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="n">condition</span> <span class="o">=</span> <span class="nb">super</span><span class="p">(</span><span class="n">ShortCircuitOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Condition result is </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">condition</span><span class="p">))</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Condition result is </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">condition</span><span class="p">)</span> + <span class="k">if</span> <span class="n">condition</span><span class="p">:</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Proceeding with downstream tasks...'</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Proceeding with downstream tasks...'</span><span class="p">)</span> <span class="k">return</span> + + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Skipping downstream tasks...'</span><span class="p">)</span> + + <span class="n">downstream_tasks</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Downstream task_ids </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span> + + <span class="k">if</span> <span class="n">downstream_tasks</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">skip</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'dag_run'</span><span class="p">],</span> <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span> + + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div> + +<span class="k">class</span> <span class="nc">PythonVirtualenvOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span> + <span class="sd">"""</span> +<span class="sd"> Allows one to run a function in a virtualenv that is created and destroyed</span> +<span class="sd"> automatically (with certain caveats).</span> + +<span class="sd"> The function must be defined using def, and not be part of a class. All imports</span> +<span class="sd"> must happen inside the function and no variables outside of the scope may be referenced.</span> +<span class="sd"> A global scope variable named virtualenv_string_args will be available (populated by</span> +<span class="sd"> string_args). In addition, one can pass stuff through op_args and op_kwargs, and one</span> +<span class="sd"> can use a return value.</span> + +<span class="sd"> Note that if your virtualenv runs in a different Python major version than Airflow,</span> +<span class="sd"> you cannot use return values, op_args, or op_kwargs. You can use string_args though.</span> + +<span class="sd"> :param python_callable: A python function with no references to outside variables,</span> +<span class="sd"> defined with def, which will be run in a virtualenv</span> +<span class="sd"> :type python_callable: function</span> +<span class="sd"> :param requirements: A list of requirements as specified in a pip install command</span> +<span class="sd"> :type requirements: list(str)</span> +<span class="sd"> :param python_version: The Python version to run the virtualenv with. Note that</span> +<span class="sd"> both 2 and 2.7 are acceptable forms.</span> +<span class="sd"> :type python_version: str</span> +<span class="sd"> :param use_dill: Whether to use dill to serialize the args and result (pickle is default).</span> +<span class="sd"> This allow more complex types but requires you to include dill in your requirements.</span> +<span class="sd"> :type use_dill: bool</span> +<span class="sd"> :param system_site_packages: Whether to include system_site_packages in your virtualenv.</span> +<span class="sd"> See virtualenv documentation for more information.</span> +<span class="sd"> :type system_site_packages: bool</span> +<span class="sd"> :param op_args: A list of positional arguments to pass to python_callable.</span> +<span class="sd"> :type op_kwargs: list</span> +<span class="sd"> :param op_kwargs: A dict of keyword arguments to pass to python_callable.</span> +<span class="sd"> :type op_kwargs: dict</span> +<span class="sd"> :param string_args: Strings that are present in the global var virtualenv_string_args,</span> +<span class="sd"> available to python_callable at runtime as a list(str). Note that args are split</span> +<span class="sd"> by newline.</span> +<span class="sd"> :type string_args: list(str)</span> + +<span class="sd"> """</span> + <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">python_callable</span><span class="p">,</span> <span class="n">requirements</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">python_version</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">use_dill</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> + <span class="n">system_site_packages</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">op_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">op_kwargs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">string_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> + <span class="nb">super</span><span class="p">(</span><span class="n">PythonVirtualenvOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span> + <span class="n">python_callable</span><span class="o">=</span><span class="n">python_callable</span><span class="p">,</span> + <span class="n">op_args</span><span class="o">=</span><span class="n">op_args</span><span class="p">,</span> + <span class="n">op_kwargs</span><span class="o">=</span><span class="n">op_kwargs</span><span class="p">,</span> + <span class="o">*</span><span class="n">args</span><span class="p">,</span> + <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span> <span class="o">=</span> <span class="n">requirements</span> <span class="ow">or</span> <span class="p">[]</span> + <span class="bp">self</span><span class="o">.</span><span class="n">string_args</span> <span class="o">=</span> <span class="n">string_args</span> <span class="ow">or</span> <span class="p">[]</span> + <span class="bp">self</span><span class="o">.</span><span class="n">python_version</span> <span class="o">=</span> <span class="n">python_version</span> + <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span> <span class="o">=</span> <span class="n">use_dill</span> + <span class="bp">self</span><span class="o">.</span><span class="n">system_site_packages</span> <span class="o">=</span> <span class="n">system_site_packages</span> + <span class="c1"># check that dill is present if needed</span> + <span class="n">dill_in_requirements</span> <span class="o">=</span> <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'dill'</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span><span class="p">)</span> + <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">system_site_packages</span><span class="p">)</span> <span class="ow">and</span> <span class="n">use_dill</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">any</span><span class="p">(</span><span class="n">dill_in_requirements</span><span class="p">):</span> + <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'If using dill, dill must be in the environment '</span> <span class="o">+</span> + <span class="s1">'either via system_site_packages or requirements'</span><span class="p">)</span> + <span class="c1"># check that a function is passed, and that it is not a lambda</span> + <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">FunctionType</span><span class="p">)</span> + <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">==</span> <span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">):</span> + <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'</span><span class="si">{}</span><span class="s1"> only supports functions for python_callable arg'</span><span class="p">,</span> + <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">)</span> + <span class="c1"># check that args are passed iff python major version matches</span> + <span class="k">if</span> <span class="p">(</span><span class="n">python_version</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> + <span class="ow">and</span> <span class="nb">str</span><span class="p">(</span><span class="n">python_version</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="nb">str</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">version_info</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> + <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">()):</span> + <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"Passing op_args or op_kwargs is not supported across "</span> + <span class="s2">"different Python major versions "</span> + <span class="s2">"for PythonVirtualenvOperator. Please use string_args."</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">execute_callable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="k">with</span> <span class="n">TemporaryDirectory</span><span class="p">(</span><span class="n">prefix</span><span class="o">=</span><span class="s1">'venv'</span><span class="p">)</span> <span class="k">as</span> <span class="n">tmp_dir</span><span class="p">:</span> + <span class="c1"># generate filenames</span> + <span class="n">input_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'script.in'</span><span class="p">)</span> + <span class="n">output_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'script.out'</span><span class="p">)</span> + <span class="n">string_args_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'string_args.txt'</span><span class="p">)</span> + <span class="n">script_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'script.py'</span><span class="p">)</span> + + <span class="c1"># set up virtualenv</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_generate_virtualenv_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">))</span> + <span class="n">cmd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_generate_pip_install_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">)</span> + <span class="k">if</span> <span class="n">cmd</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span><span class="n">cmd</span><span class="p">)</span> + + <span class="bp">self</span><span class="o">.</span><span class="n">_write_args</span><span class="p">(</span><span class="n">input_filename</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_write_script</span><span class="p">(</span><span class="n">script_filename</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_write_string_args</span><span class="p">(</span><span class="n">string_args_filename</span><span class="p">)</span> + + <span class="c1"># execute command in virtualenv</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_generate_python_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> + <span class="n">script_filename</span><span class="p">,</span> + <span class="n">input_filename</span><span class="p">,</span> + <span class="n">output_filename</span><span class="p">,</span> + <span class="n">string_args_filename</span><span class="p">))</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_result</span><span class="p">(</span><span class="n">output_filename</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">_pass_op_args</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="c1"># we should only pass op_args if any are given to us</span> + <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span> <span class="o">></span> <span class="mi">0</span> + + <span class="k">def</span> <span class="nf">_execute_in_subprocess</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">cmd</span><span class="p">):</span> + <span class="k">try</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Executing cmd</span><span class="se">\n</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">cmd</span><span class="p">))</span> + <span class="n">output</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">check_output</span><span class="p">(</span><span class="n">cmd</span><span class="p">,</span> <span class="n">stderr</span><span class="o">=</span><span class="n">subprocess</span><span class="o">.</span><span class="n">STDOUT</span><span class="p">)</span> + <span class="k">if</span> <span class="n">output</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Got output</span><span class="se">\n</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">output</span><span class="p">))</span> + <span class="k">except</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">CalledProcessError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Got error output</span><span class="se">\n</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">output</span><span class="p">))</span> + <span class="k">raise</span> + + <span class="k">def</span> <span class="nf">_write_string_args</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">filename</span><span class="p">):</span> + <span class="c1"># writes string_args to a file, which are read line by line</span> + <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">filename</span><span class="p">,</span> <span class="s1">'w'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span> + <span class="n">f</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">string_args</span><span class="p">)))</span> + + <span class="k">def</span> <span class="nf">_write_args</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">):</span> + <span class="c1"># serialize args to file</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">():</span> + <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">input_filename</span><span class="p">,</span> <span class="s1">'wb'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span> + <span class="n">arg_dict</span> <span class="o">=</span> <span class="p">({</span><span class="s1">'args'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="s1">'kwargs'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">})</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span> + <span class="n">dill</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">arg_dict</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span> + <span class="k">else</span><span class="p">:</span> + <span class="n">pickle</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">arg_dict</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">_read_result</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">):</span> + <span class="k">if</span> <span class="n">os</span><span class="o">.</span><span class="n">stat</span><span class="p">(</span><span class="n">output_filename</span><span class="p">)</span><span class="o">.</span><span class="n">st_size</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> + <span class="k">return</span> <span class="kc">None</span> + <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">output_filename</span><span class="p">,</span> <span class="s1">'rb'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span> + <span class="k">try</span><span class="p">:</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span> + <span class="k">return</span> <span class="n">dill</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span> + <span class="k">else</span><span class="p">:</span> + <span class="k">return</span> <span class="n">pickle</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span> + <span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Error deserializing result. Note that result deserialization "</span> + <span class="s2">"is not supported across major Python versions."</span><span class="p">)</span> + <span class="k">raise</span> + + <span class="k">def</span> <span class="nf">_write_script</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">script_filename</span><span class="p">):</span> + <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">script_filename</span><span class="p">,</span> <span class="s1">'w'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span> + <span class="n">python_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_generate_python_code</span><span class="p">()</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Writing code to file</span><span class="se">\n</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">python_code</span><span class="p">))</span> + <span class="n">f</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">python_code</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">_generate_virtualenv_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">):</span> + <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'virtualenv'</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">]</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">system_site_packages</span><span class="p">:</span> + <span class="n">cmd</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--system-site-packages'</span><span class="p">)</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_version</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> + <span class="n">cmd</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--python=python</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">python_version</span><span class="p">))</span> + <span class="k">return</span> <span class="n">cmd</span> + + <span class="k">def</span> <span class="nf">_generate_pip_install_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">):</span> + <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">requirements</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> + <span class="k">return</span> <span class="p">[]</span> + <span class="k">else</span><span class="p">:</span> + <span class="c1"># direct path alleviates need to activate</span> + <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'</span><span class="si">{}</span><span class="s1">/bin/pip'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">),</span> <span class="s1">'install'</span><span class="p">]</span> + <span class="k">return</span> <span class="n">cmd</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span> + + <span class="k">def</span> <span class="nf">_generate_python_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">,</span> <span class="n">script_filename</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">,</span> <span class="n">string_args_filename</span><span class="p">):</span> + <span class="c1"># direct path alleviates need to activate</span> + <span class="k">return</span> <span class="p">[</span><span class="s1">'</span><span class="si">{}</span><span class="s1">/bin/python'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">),</span> <span class="n">script_filename</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">,</span> <span class="n">string_args_filename</span><span class="p">]</span> + + <span class="k">def</span> <span class="nf">_generate_python_code</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span> + <span class="n">pickling_library</span> <span class="o">=</span> <span class="s1">'dill'</span> + <span class="k">else</span><span class="p">:</span> + <span class="n">pickling_library</span> <span class="o">=</span> <span class="s1">'pickle'</span> + <span class="n">fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span> + <span class="c1"># dont try to read pickle if we didnt pass anything</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">():</span> + <span class="n">load_args_line</span> <span class="o">=</span> <span class="s1">'with open(sys.argv[1], "rb") as f: arg_dict = </span><span class="si">{}</span><span class="s1">.load(f)'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pickling_library</span><span class="p">)</span> <span class="k">else</span><span class="p">:</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Skipping downstream tasks...'</span><span class="p">)</span> - <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> - <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">:</span> - <span class="n">ti</span> <span class="o">=</span> <span class="n">TaskInstance</span><span class="p">(</span> - <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> - <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span> - <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> - <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> - <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div> + <span class="n">load_args_line</span> <span class="o">=</span> <span class="s1">'arg_dict = {"args": [], "kwargs": </span><span class="si">{}</span><span class="s1">}'</span> + + <span class="c1"># no indents in original code so we can accept any type of indents in the original function</span> + <span class="c1"># we deserialize args, call function, serialize result if necessary</span> + <span class="k">return</span> <span class="n">dedent</span><span class="p">(</span><span class="s2">"""</span><span class="se">\</span> +<span class="s2"> import </span><span class="si">{pickling_library}</span><span class="s2"></span> +<span class="s2"> import sys</span> +<span class="s2"> </span><span class="si">{load_args_code}</span><span class="s2"></span> +<span class="s2"> args = arg_dict["args"]</span> +<span class="s2"> kwargs = arg_dict["kwargs"]</span> +<span class="s2"> with open(sys.argv[3], 'r') as f: virtualenv_string_args = list(map(lambda x: x.strip(), list(f)))</span> +<span class="s2"> </span><span class="si">{python_callable_lines}</span><span class="s2"></span> +<span class="s2"> res = </span><span class="si">{python_callable_name}</span><span class="s2">(*args, **kwargs)</span> +<span class="s2"> with open(sys.argv[2], 'wb') as f: res is not None and </span><span class="si">{pickling_library}</span><span class="s2">.dump(res, f)</span> +<span class="s2"> """</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> + <span class="n">load_args_code</span><span class="o">=</span><span class="n">load_args_line</span><span class="p">,</span> + <span class="n">python_callable_lines</span><span class="o">=</span><span class="n">dedent</span><span class="p">(</span><span class="n">inspect</span><span class="o">.</span><span class="n">getsource</span><span class="p">(</span><span class="n">fn</span><span class="p">)),</span> + <span class="n">python_callable_name</span><span class="o">=</span><span class="n">fn</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> + <span class="n">pickling_library</span><span class="o">=</span><span class="n">pickling_library</span><span class="p">)</span> + + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span> + </pre></div> </div>
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/qubole_operator.html ---------------------------------------------------------------------- diff --git a/_modules/qubole_operator.html b/_modules/qubole_operator.html index fb33047..c6fdd28 100644 --- a/_modules/qubole_operator.html +++ b/_modules/qubole_operator.html @@ -13,6 +13,8 @@ + + @@ -30,6 +32,9 @@ + <link rel="index" title="Index" + href="../genindex.html"/> + <link rel="search" title="Search" href="../search.html"/> <link rel="top" title="Airflow Documentation" href="../index.html"/> <link rel="up" title="Module code" href="index.html"/> @@ -40,6 +45,7 @@ <body class="wy-body-for-nav" role="document"> + <div class="wy-grid-for-nav"> @@ -76,7 +82,10 @@ - <ul> + + + + <ul> <li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li> <li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li> <li class="toctree-l1"><a class="reference internal" href="../start.html">Quick Start</a></li> @@ -90,6 +99,8 @@ <li class="toctree-l1"><a class="reference internal" href="../scheduler.html">Scheduling & Triggers</a></li> <li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li> <li class="toctree-l1"><a class="reference internal" href="../security.html">Security</a></li> +<li class="toctree-l1"><a class="reference internal" href="../api.html">Experimental Rest API</a></li> +<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li> <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li> <li class="toctree-l1"><a class="reference internal" href="../code.html">API Reference</a></li> </ul> @@ -104,8 +115,10 @@ <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> - <i data-toggle="wy-nav-top" class="fa fa-bars"></i> - <a href="../index.html">Airflow</a> + + <i data-toggle="wy-nav-top" class="fa fa-bars"></i> + <a href="../index.html">Airflow</a> + </nav> @@ -118,19 +131,36 @@ + + + + + + + + + + <div role="navigation" aria-label="breadcrumbs navigation"> + <ul class="wy-breadcrumbs"> - <li><a href="../index.html">Docs</a> »</li> - + + <li><a href="../index.html">Docs</a> »</li> + <li><a href="index.html">Module code</a> »</li> - - <li>qubole_operator</li> + + <li>qubole_operator</li> + + <li class="wy-breadcrumbs-aside"> - + </li> + </ul> + + <hr/> </div> <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> @@ -151,9 +181,9 @@ <span class="c1"># See the License for the specific language governing permissions and</span> <span class="c1"># limitations under the License.</span> -<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">BaseOperator</span> -<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span> -<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.qubole_hook</span> <span class="kn">import</span> <span class="n">QuboleHook</span> +<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span> +<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span> +<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.qubole_hook</span> <span class="k">import</span> <span class="n">QuboleHook</span> <div class="viewcode-block" id="QuboleOperator"><a class="viewcode-back" href="../code.html#airflow.contrib.operators.QuboleOperator">[docs]</a><span class="k">class</span> <span class="nc">QuboleOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span> @@ -168,6 +198,7 @@ <span class="sd"> :tags: array of tags to be assigned with the command</span> <span class="sd"> :cluster_label: cluster label on which the command will be executed</span> <span class="sd"> :name: name to be given to command</span> +<span class="sd"> :notify: whether to send email on command completion or not (default is False)</span> <span class="sd"> **Arguments specific to command types**</span> @@ -181,20 +212,28 @@ <span class="sd"> :script_location: s3 location containing query statement</span> <span class="sd"> :macros: macro values which were used in query</span> <span class="sd"> hadoopcmd:</span> -<span class="sd"> :sub_commnad: must be one these ["jar", "s3distcp", "streaming"] followed by 1 or more args</span> +<span class="sd"> :sub_commnad: must be one these ["jar", "s3distcp", "streaming"] followed by</span> +<span class="sd"> 1 or more args</span> <span class="sd"> shellcmd:</span> <span class="sd"> :script: inline command with args</span> <span class="sd"> :script_location: s3 location containing query statement</span> -<span class="sd"> :files: list of files in s3 bucket as file1,file2 format. These files will be copied into the working directory where the qubole command is being executed.</span> -<span class="sd"> :archives: list of archives in s3 bucket as archive1,archive2 format. These will be unarchived intothe working directory where the qubole command is being executed</span> -<span class="sd"> :parameters: any extra args which need to be passed to script (only when script_location is supplied)</span> +<span class="sd"> :files: list of files in s3 bucket as file1,file2 format. These files will be</span> +<span class="sd"> copied into the working directory where the qubole command is being</span> +<span class="sd"> executed.</span> +<span class="sd"> :archives: list of archives in s3 bucket as archive1,archive2 format. These</span> +<span class="sd"> will be unarchived intothe working directory where the qubole command is</span> +<span class="sd"> being executed</span> +<span class="sd"> :parameters: any extra args which need to be passed to script (only when</span> +<span class="sd"> script_location is supplied)</span> <span class="sd"> pigcmd:</span> <span class="sd"> :script: inline query statement (latin_statements)</span> <span class="sd"> :script_location: s3 location containing pig query</span> -<span class="sd"> :parameters: any extra args which need to be passed to script (only when script_location is supplied</span> +<span class="sd"> :parameters: any extra args which need to be passed to script (only when</span> +<span class="sd"> script_location is supplied</span> <span class="sd"> sparkcmd:</span> <span class="sd"> :program: the complete Spark Program in Scala, SQL, Command, R, or Python</span> -<span class="sd"> :cmdline: spark-submit command line, all required information must be specify in cmdline itself.</span> +<span class="sd"> :cmdline: spark-submit command line, all required information must be specify</span> +<span class="sd"> in cmdline itself.</span> <span class="sd"> :sql: inline sql query</span> <span class="sd"> :script_location: s3 location containing query statement</span> <span class="sd"> :language: language of the program, Scala, SQL, Command, R, or Python</span> @@ -215,7 +254,7 @@ <span class="sd"> :db_update_mode: allowinsert or updateonly</span> <span class="sd"> :db_update_keys: columns used to determine the uniqueness of rows</span> <span class="sd"> :export_dir: HDFS/S3 location from which data will be exported.</span> -<span class="sd"> :fields_terminated_by: hex of the char used as column separator in the dataset.</span> +<span class="sd"> :fields_terminated_by: hex of the char used as column separator in the dataset</span> <span class="sd"> dbimportcmd:</span> <span class="sd"> :mode: 1 (simple), 2 (advance)</span> <span class="sd"> :hive_table: Name of the hive table</span> @@ -223,17 +262,32 @@ <span class="sd"> :db_table: name of the db table</span> <span class="sd"> :where_clause: where clause, if any</span> <span class="sd"> :parallelism: number of parallel db connections to use for extracting data</span> -<span class="sd"> :extract_query: SQL query to extract data from db. $CONDITIONS must be part of the where clause.</span> +<span class="sd"> :extract_query: SQL query to extract data from db. $CONDITIONS must be part</span> +<span class="sd"> of the where clause.</span> <span class="sd"> :boundary_query: Query to be used get range of row IDs to be extracted</span> <span class="sd"> :split_column: Column used as row ID to split data into ranges (mode 2)</span> -<span class="sd"> .. note:: Following fields are template-supported : ``query``, ``script_location``, ``sub_command``, ``script``, ``files``,</span> -<span class="sd"> ``archives``, ``program``, ``cmdline``, ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``, ``tags``,</span> -<span class="sd"> ``name``, ``parameters``. You can also use ``.txt`` files for template driven use cases.</span> +<span class="sd"> .. note:: Following fields are template-supported : ``query``, ``script_location``,</span> +<span class="sd"> ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,</span> +<span class="sd"> ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,</span> +<span class="sd"> ``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``,</span> +<span class="sd"> ``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``,</span> +<span class="sd"> ``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``.</span> +<span class="sd"> You can also use ``.txt`` files for template driven use cases.</span> + +<span class="sd"> .. note:: In QuboleOperator there is a default handler for task failures and retries,</span> +<span class="sd"> which generally kills the command running at QDS for the corresponding task</span> +<span class="sd"> instance. You can override this behavior by providing your own failure and retry</span> +<span class="sd"> handler in task definition.</span> <span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'query'</span><span class="p">,</span> <span class="s1">'script_location'</span><span class="p">,</span> <span class="s1">'sub_command'</span><span class="p">,</span> <span class="s1">'script'</span><span class="p">,</span> <span class="s1">'files'</span><span class="p">,</span> <span class="s1">'archives'</span><span class="p">,</span> <span class="s1">'program'</span><span class="p">,</span> <span class="s1">'cmdline'</span><span class="p">,</span> - <span class="s1">'sql'</span><span class="p">,</span> <span class="s1">'where_clause'</span><span class="p">,</span> <span class="s1">'extract_query'</span><span class="p">,</span> <span class="s1">'boundary_query'</span><span class="p">,</span> <span class="s1">'macros'</span><span class="p">,</span> <span class="s1">'tags'</span><span class="p">,</span> <span class="s1">'name'</span><span class="p">,</span> <span class="s1">'parameters'</span><span class="p">)</span> + <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'query'</span><span class="p">,</span> <span class="s1">'script_location'</span><span class="p">,</span> <span class="s1">'sub_command'</span><span class="p">,</span> <span class="s1">'script'</span><span class="p">,</span> <span class="s1">'files'</span><span class="p">,</span> + <span class="s1">'archives'</span><span class="p">,</span> <span class="s1">'program'</span><span class="p">,</span> <span class="s1">'cmdline'</span><span class="p">,</span> <span class="s1">'sql'</span><span class="p">,</span> <span class="s1">'where_clause'</span><span class="p">,</span> <span class="s1">'tags'</span><span class="p">,</span> + <span class="s1">'extract_query'</span><span class="p">,</span> <span class="s1">'boundary_query'</span><span class="p">,</span> <span class="s1">'macros'</span><span class="p">,</span> <span class="s1">'name'</span><span class="p">,</span> <span class="s1">'parameters'</span><span class="p">,</span> + <span class="s1">'dbtap_id'</span><span class="p">,</span> <span class="s1">'hive_table'</span><span class="p">,</span> <span class="s1">'db_table'</span><span class="p">,</span> <span class="s1">'split_column'</span><span class="p">,</span> <span class="s1">'note_id'</span><span class="p">,</span> + <span class="s1">'db_update_keys'</span><span class="p">,</span> <span class="s1">'export_dir'</span><span class="p">,</span> <span class="s1">'partition_spec'</span><span class="p">,</span> <span class="s1">'qubole_conn_id'</span><span class="p">,</span> + <span class="s1">'arguments'</span><span class="p">,</span> <span class="s1">'user_program_arguments'</span><span class="p">)</span> + <span class="n">template_ext</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'.txt'</span><span class="p">,)</span> <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#3064A1'</span> <span class="n">ui_fgcolor</span> <span class="o">=</span> <span class="s1">'#fff'</span> @@ -243,25 +297,32 @@ <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">'qubole_conn_id'</span><span class="p">]</span> <span class="o">=</span> <span class="n">qubole_conn_id</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> - <span class="nb">super</span><span class="p">(</span><span class="n">QuboleOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> + <span class="nb">super</span><span class="p">(</span><span class="n">QuboleOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> + + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="o">.</span><span class="n">handle_failure_retry</span> + + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="o">.</span><span class="n">handle_failure_retry</span> <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="c1"># Reinitiating the hook, as some template fields might have changed</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> - <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> + <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> + <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> - <span class="k">def</span> <span class="nf">get_results</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">fp</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">inline</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span> <span class="n">delim</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">fetch</span><span class="o">=</span><span class="bp">True</span><span class="p">):</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_results</span><span class="p">(</span><span class="n">ti</span><span class="p">,</span> <span class="n">fp</span><span class="p">,</span> <span class="n">inline</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">fetch</span><span class="p">)</span> + <span class="k">def</span> <span class="nf">get_results</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">fp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">inline</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">delim</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">fetch</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_results</span><span class="p">(</span><span class="n">ti</span><span class="p">,</span> <span class="n">fp</span><span class="p">,</span> <span class="n">inline</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">fetch</span><span class="p">)</span> <span class="k">def</span> <span class="nf">get_log</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_log</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_log</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> <span class="k">def</span> <span class="nf">get_jobs_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_jobs_id</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_jobs_id</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">get_hook</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="c1"># Reinitiating the hook, as some template fields might have changed</span> + <span class="k">return</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span> <span class="k">def</span> <span class="nf">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">QuboleOperator</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span> @@ -270,19 +331,19 @@ <span class="k">else</span><span class="p">:</span> <span class="k">return</span> <span class="s1">''</span> <span class="k">else</span><span class="p">:</span> - <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="n">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> + <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> <span class="k">def</span> <span class="nf">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">QuboleOperator</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span> <span class="k">else</span><span class="p">:</span> - <span class="nb">object</span><span class="o">.</span><span class="n">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div> - - - + <span class="nb">object</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div> </pre></div> </div> + <div class="articleComments"> + + </div> </div> <footer> @@ -315,7 +376,8 @@ VERSION:'', COLLAPSE_INDEX:false, FILE_SUFFIX:'.html', - HAS_SOURCE: true + HAS_SOURCE: true, + SOURCELINK_SUFFIX: '.txt' }; </script> <script type="text/javascript" src="../_static/jquery.js"></script>
