http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/airflow/models.html ---------------------------------------------------------------------- diff --git a/_modules/airflow/models.html b/_modules/airflow/models.html index 0b043ea..485f9ae 100644 --- a/_modules/airflow/models.html +++ b/_modules/airflow/models.html @@ -13,6 +13,8 @@ + + @@ -80,7 +82,10 @@ - <ul> + + + + <ul> <li class="toctree-l1"><a class="reference internal" href="../../project.html">Project</a></li> <li class="toctree-l1"><a class="reference internal" href="../../license.html">License</a></li> <li class="toctree-l1"><a class="reference internal" href="../../start.html">Quick Start</a></li> @@ -175,6 +180,7 @@ <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> <span class="c1"># See the License for the specific language governing permissions and</span> <span class="c1"># limitations under the License.</span> + <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span> <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">division</span> <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">print_function</span> @@ -193,7 +199,7 @@ <span class="kn">import</span> <span class="nn">getpass</span> <span class="kn">import</span> <span class="nn">imp</span> <span class="kn">import</span> <span class="nn">importlib</span> -<span class="kn">import</span> <span class="nn">inspect</span> +<span class="kn">import</span> <span class="nn">itertools</span> <span class="kn">import</span> <span class="nn">zipfile</span> <span class="kn">import</span> <span class="nn">jinja2</span> <span class="kn">import</span> <span class="nn">json</span> @@ -208,12 +214,11 @@ <span class="kn">import</span> <span class="nn">traceback</span> <span class="kn">import</span> <span class="nn">warnings</span> <span class="kn">import</span> <span class="nn">hashlib</span> - <span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="k">import</span> <span class="n">urlparse</span> <span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="p">(</span> <span class="n">Column</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">String</span><span class="p">,</span> <span class="n">DateTime</span><span class="p">,</span> <span class="n">Text</span><span class="p">,</span> <span class="n">Boolean</span><span class="p">,</span> <span class="n">ForeignKey</span><span class="p">,</span> <span class="n">PickleType</span><span class="p">,</span> - <span class="n">Index</span><span class="p">,</span> <span class="n">Float</span><span class="p">)</span> + <span class="n">Index</span><span class="p">,</span> <span class="n">Float</span><span class="p">,</span> <span class="n">LargeBinary</span><span class="p">)</span> <span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="n">func</span><span class="p">,</span> <span class="n">or_</span><span class="p">,</span> <span class="n">and_</span> <span class="kn">from</span> <span class="nn">sqlalchemy.ext.declarative</span> <span class="k">import</span> <span class="n">declarative_base</span><span class="p">,</span> <span class="n">declared_attr</span> <span class="kn">from</span> <span class="nn">sqlalchemy.dialects.mysql</span> <span class="k">import</span> <span class="n">LONGTEXT</span> @@ -223,13 +228,15 @@ <span class="kn">import</span> <span class="nn">six</span> <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span><span class="p">,</span> <span class="n">utils</span> -<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span> <span class="n">LocalExecutor</span> +<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">GetDefaultExecutor</span><span class="p">,</span> <span class="n">LocalExecutor</span> <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span> <span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSkipException</span><span class="p">,</span> <span class="n">AirflowTaskTimeout</span> <span class="kn">from</span> <span class="nn">airflow.dag.base_dag</span> <span class="k">import</span> <span class="n">BaseDag</span><span class="p">,</span> <span class="n">BaseDagBag</span> <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.not_in_retry_period_dep</span> <span class="k">import</span> <span class="n">NotInRetryPeriodDep</span> <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.prev_dagrun_dep</span> <span class="k">import</span> <span class="n">PrevDagrunDep</span> <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.trigger_rule_dep</span> <span class="k">import</span> <span class="n">TriggerRuleDep</span> +<span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.task_concurrency_dep</span> <span class="k">import</span> <span class="n">TaskConcurrencyDep</span> + <span class="kn">from</span> <span class="nn">airflow.ti_deps.dep_context</span> <span class="k">import</span> <span class="n">DepContext</span><span class="p">,</span> <span class="n">QUEUE_DEPS</span><span class="p">,</span> <span class="n">RUN_DEPS</span> <span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="k">import</span> <span class="n">cron_presets</span><span class="p">,</span> <span class="n">date_range</span> <span class="k">as</span> <span class="n">utils_date_range</span> <span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span> @@ -237,11 +244,11 @@ <span class="kn">from</span> <span class="nn">airflow.utils.email</span> <span class="k">import</span> <span class="n">send_email</span> <span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="p">(</span> <span class="n">as_tuple</span><span class="p">,</span> <span class="n">is_container</span><span class="p">,</span> <span class="n">is_in</span><span class="p">,</span> <span class="n">validate_key</span><span class="p">,</span> <span class="n">pprinttable</span><span class="p">)</span> -<span class="kn">from</span> <span class="nn">airflow.utils.logging</span> <span class="k">import</span> <span class="n">LoggingMixin</span> <span class="kn">from</span> <span class="nn">airflow.utils.operator_resources</span> <span class="k">import</span> <span class="n">Resources</span> <span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span> <span class="kn">from</span> <span class="nn">airflow.utils.timeout</span> <span class="k">import</span> <span class="n">timeout</span> <span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="k">import</span> <span class="n">TriggerRule</span> +<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span> <span class="n">Base</span> <span class="o">=</span> <span class="n">declarative_base</span><span class="p">()</span> <span class="n">ID_LEN</span> <span class="o">=</span> <span class="mi">250</span> @@ -249,24 +256,36 @@ <span class="n">Stats</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Stats</span> -<span class="n">ENCRYPTION_ON</span> <span class="o">=</span> <span class="kc">False</span> -<span class="k">try</span><span class="p">:</span> - <span class="kn">from</span> <span class="nn">cryptography.fernet</span> <span class="k">import</span> <span class="n">Fernet</span> - <span class="n">FERNET</span> <span class="o">=</span> <span class="n">Fernet</span><span class="p">(</span><span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'FERNET_KEY'</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> - <span class="n">ENCRYPTION_ON</span> <span class="o">=</span> <span class="kc">True</span> -<span class="k">except</span><span class="p">:</span> - <span class="k">pass</span> +<span class="k">def</span> <span class="nf">get_fernet</span><span class="p">():</span> + <span class="sd">"""</span> +<span class="sd"> Deferred load of Fernet key.</span> + +<span class="sd"> This function could fail either because Cryptography is not installed</span> +<span class="sd"> or because the Fernet key is invalid.</span> + +<span class="sd"> :return: Fernet object</span> +<span class="sd"> :raises: AirflowException if there's a problem trying to load Fernet</span> +<span class="sd"> """</span> + <span class="k">try</span><span class="p">:</span> + <span class="kn">from</span> <span class="nn">cryptography.fernet</span> <span class="k">import</span> <span class="n">Fernet</span> + <span class="k">except</span><span class="p">:</span> + <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'Failed to import Fernet, it may not be installed'</span><span class="p">)</span> + <span class="k">try</span><span class="p">:</span> + <span class="k">return</span> <span class="n">Fernet</span><span class="p">(</span><span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'FERNET_KEY'</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span> + <span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">ve</span><span class="p">:</span> + <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"Could not create Fernet object: </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ve</span><span class="p">))</span> + <span class="k">if</span> <span class="s1">'mysql'</span> <span class="ow">in</span> <span class="n">settings</span><span class="o">.</span><span class="n">SQL_ALCHEMY_CONN</span><span class="p">:</span> <span class="n">LongText</span> <span class="o">=</span> <span class="n">LONGTEXT</span> <span class="k">else</span><span class="p">:</span> <span class="n">LongText</span> <span class="o">=</span> <span class="n">Text</span> -<span class="c1"># used by DAG context_managers</span> +<span class="c1"># Used by DAG context_managers</span> <span class="n">_CONTEXT_MANAGER_DAG</span> <span class="o">=</span> <span class="kc">None</span> -<span class="k">def</span> <span class="nf">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="p">,</span> <span class="n">session</span><span class="p">,</span> <span class="n">activate_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> +<span class="k">def</span> <span class="nf">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="p">,</span> <span class="n">session</span><span class="p">,</span> <span class="n">activate_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> <span class="sd">"""</span> <span class="sd"> Clears a set of task instances, but makes sure the running ones</span> <span class="sd"> get killed.</span> @@ -277,26 +296,34 @@ <span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">:</span> <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span> <span class="n">job_ids</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">)</span> - <span class="c1"># todo: this creates an issue with the webui tests</span> - <span class="c1"># elif ti.state != State.REMOVED:</span> - <span class="c1"># ti.state = State.NONE</span> - <span class="c1"># session.merge(ti)</span> <span class="k">else</span><span class="p">:</span> - <span class="n">session</span><span class="o">.</span><span class="n">delete</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> + <span class="n">task_id</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span> + <span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">has_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">):</span> + <span class="n">task</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">)</span> + <span class="n">task_retries</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span> + <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">+</span> <span class="n">task_retries</span> <span class="o">-</span> <span class="mi">1</span> + <span class="k">else</span><span class="p">:</span> + <span class="c1"># Ignore errors when updating max_tries if dag is None or</span> + <span class="c1"># task not found in dag since database records could be</span> + <span class="c1"># outdated. We make max_tries the maximum value of its</span> + <span class="c1"># original max_tries or the current task try number.</span> + <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span><span class="p">,</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">1</span><span class="p">)</span> + <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span> + <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> + <span class="k">if</span> <span class="n">job_ids</span><span class="p">:</span> <span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">BaseJob</span> <span class="k">as</span> <span class="n">BJ</span> <span class="k">for</span> <span class="n">job</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">BJ</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">BJ</span><span class="o">.</span><span class="n">id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">job_ids</span><span class="p">))</span><span class="o">.</span><span class="n">all</span><span class="p">():</span> <span class="n">job</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span> - <span class="k">if</span> <span class="n">activate_dag_runs</span><span class="p">:</span> - <span class="n">execution_dates</span> <span class="o">=</span> <span class="p">{</span><span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}</span> - <span class="n">dag_ids</span> <span class="o">=</span> <span class="p">{</span><span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}</span> + + <span class="k">if</span> <span class="n">activate_dag_runs</span> <span class="ow">and</span> <span class="n">tis</span><span class="p">:</span> <span class="n">drs</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> - <span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">dag_ids</span><span class="p">),</span> - <span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">execution_dates</span><span class="p">),</span> + <span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span> + <span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span> <span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span> <span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span> <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span> - <span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> + <span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <div class="viewcode-block" id="DagBag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag">[docs]</a><span class="k">class</span> <span class="nc">DagBag</span><span class="p">(</span><span class="n">BaseDagBag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span> @@ -316,19 +343,19 @@ <span class="sd"> :param include_examples: whether to include the examples that ship</span> <span class="sd"> with airflow or not</span> <span class="sd"> :type include_examples: bool</span> -<span class="sd"> :param sync_to_db: whether to sync the properties of the DAGs to</span> -<span class="sd"> the metadata DB while finding them, typically should be done</span> -<span class="sd"> by the scheduler job only</span> -<span class="sd"> :type sync_to_db: bool</span> <span class="sd"> """</span> + <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> <span class="bp">self</span><span class="p">,</span> <span class="n">dag_folder</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> - <span class="n">executor</span><span class="o">=</span><span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span> + <span class="n">executor</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">include_examples</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'LOAD_EXAMPLES'</span><span class="p">)):</span> + <span class="c1"># do not use default arg in signature, to fix import cycle on plugin load</span> + <span class="k">if</span> <span class="n">executor</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> + <span class="n">executor</span> <span class="o">=</span> <span class="n">GetDefaultExecutor</span><span class="p">()</span> <span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="ow">or</span> <span class="n">settings</span><span class="o">.</span><span class="n">DAGS_FOLDER</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Filling up the DagBag from </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">))</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Filling up the DagBag from </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">dag_folder</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span> <span class="o">=</span> <span class="p">{}</span> <span class="c1"># the file's last modified timestamp when we last read it</span> @@ -338,7 +365,7 @@ <span class="k">if</span> <span class="n">include_examples</span><span class="p">:</span> <span class="n">example_dag_folder</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span> - <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">__file__</span><span class="p">),</span> + <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="vm">__file__</span><span class="p">),</span> <span class="s1">'example_dags'</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">example_dag_folder</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">)</span> @@ -401,7 +428,7 @@ <span class="k">return</span> <span class="n">found_dags</span> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> - <span class="n">logging</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="k">return</span> <span class="n">found_dags</span> <span class="n">mods</span> <span class="o">=</span> <span class="p">[]</span> @@ -409,11 +436,11 @@ <span class="k">if</span> <span class="n">safe_mode</span> <span class="ow">and</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">isfile</span><span class="p">(</span><span class="n">filepath</span><span class="p">):</span> <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">filepath</span><span class="p">,</span> <span class="s1">'rb'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span> <span class="n">content</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> - <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="n">b</span><span class="s1">'DAG'</span><span class="p">,</span> <span class="n">b</span><span class="s1">'airflow'</span><span class="p">)]):</span> + <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="sa">b</span><span class="s1">'DAG'</span><span class="p">,</span> <span class="sa">b</span><span class="s1">'airflow'</span><span class="p">)]):</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span> <span class="k">return</span> <span class="n">found_dags</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Importing </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">filepath</span><span class="p">))</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Importing </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="n">org_mod_name</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="n">filepath</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">])</span> <span class="n">mod_name</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'unusual_prefix_'</span> <span class="o">+</span> <span class="n">hashlib</span><span class="o">.</span><span class="n">sha1</span><span class="p">(</span><span class="n">filepath</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">()</span> <span class="o">+</span> @@ -427,7 +454,7 @@ <span class="n">m</span> <span class="o">=</span> <span class="n">imp</span><span class="o">.</span><span class="n">load_source</span><span class="p">(</span><span class="n">mod_name</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="n">mods</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">m</span><span class="p">)</span> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to import: "</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to import: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span> @@ -438,15 +465,12 @@ <span class="n">mod_name</span><span class="p">,</span> <span class="n">ext</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">)</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">head</span> <span class="ow">and</span> <span class="p">(</span><span class="n">ext</span> <span class="o">==</span> <span class="s1">'.py'</span> <span class="ow">or</span> <span class="n">ext</span> <span class="o">==</span> <span class="s1">'.pyc'</span><span class="p">):</span> <span class="k">if</span> <span class="n">mod_name</span> <span class="o">==</span> <span class="s1">'__init__'</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">"Found __init__.</span><span class="si">{0}</span><span class="s2"> at root of </span><span class="si">{1}</span><span class="s2">"</span><span class="o">.</span> - <span class="nb">format</span><span class="p">(</span><span class="n">ext</span><span class="p">,</span> <span class="n">filepath</span><span class="p">))</span> - + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">"Found __init__.</span><span class="si">%s</span><span class="s2"> at root of </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">ext</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="k">if</span> <span class="n">safe_mode</span><span class="p">:</span> <span class="k">with</span> <span class="n">zip_file</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">)</span> <span class="k">as</span> <span class="n">zf</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Reading </span><span class="si">{}</span><span class="s2"> from </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span> - <span class="nb">format</span><span class="p">(</span><span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">,</span> <span class="n">filepath</span><span class="p">))</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Reading </span><span class="si">%s</span><span class="s2"> from </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="n">content</span> <span class="o">=</span> <span class="n">zf</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> - <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="n">b</span><span class="s1">'DAG'</span><span class="p">,</span> <span class="n">b</span><span class="s1">'airflow'</span><span class="p">)]):</span> + <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="sa">b</span><span class="s1">'DAG'</span><span class="p">,</span> <span class="sa">b</span><span class="s1">'airflow'</span><span class="p">)]):</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span> <span class="n">file_last_changed_on_disk</span><span class="p">)</span> <span class="c1"># todo: create ignore list</span> @@ -460,12 +484,12 @@ <span class="n">m</span> <span class="o">=</span> <span class="n">importlib</span><span class="o">.</span><span class="n">import_module</span><span class="p">(</span><span class="n">mod_name</span><span class="p">)</span> <span class="n">mods</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">m</span><span class="p">)</span> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to import: "</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to import: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span> <span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">mods</span><span class="p">:</span> - <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span> + <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">dag</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">=</span> <span class="n">filepath</span> @@ -477,19 +501,17 @@ <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span> <span class="k">return</span> <span class="n">found_dags</span></div> - <span class="nd">@provide_session</span> -<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a> <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> +<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a> <span class="nd">@provide_session</span> + <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> <span class="sd">"""</span> <span class="sd"> Fails tasks that haven't had a heartbeat in too long</span> <span class="sd"> """</span> <span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">LocalTaskJob</span> <span class="k">as</span> <span class="n">LJ</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Finding 'running' jobs without a recent heartbeat"</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Finding 'running' jobs without a recent heartbeat"</span><span class="p">)</span> <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> - <span class="n">secs</span> <span class="o">=</span> <span class="p">(</span> - <span class="n">configuration</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="s1">'scheduler'</span><span class="p">,</span> <span class="s1">'scheduler_zombie_task_threshold'</span><span class="p">))</span> - <span class="n">limit_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">secs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s2">"Failing jobs without heartbeat after </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">limit_dttm</span><span class="p">))</span> + <span class="n">secs</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="s1">'scheduler'</span><span class="p">,</span> <span class="s1">'scheduler_zombie_task_threshold'</span><span class="p">)</span> + <span class="n">limit_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">secs</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Failing jobs without heartbeat after </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">limit_dttm</span><span class="p">)</span> <span class="n">tis</span> <span class="o">=</span> <span class="p">(</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span> @@ -509,9 +531,8 @@ <span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">task_ids</span><span class="p">:</span> <span class="n">task</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span> <span class="n">ti</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task</span> - <span class="n">ti</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="s2">"</span><span class="si">{}</span><span class="s2"> killed as zombie"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Marked zombie job </span><span class="si">{}</span><span class="s1"> as failed'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span> + <span class="n">ti</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="s2">"</span><span class="si">{}</span><span class="s2"> killed as zombie"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">ti</span><span class="p">)))</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Marked zombie job </span><span class="si">%s</span><span class="s1"> as failed'</span><span class="p">,</span> <span class="n">ti</span><span class="p">)</span> <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'zombies_killed'</span><span class="p">)</span> <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div> @@ -521,7 +542,7 @@ <span class="sd"> """</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">[</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">dag</span> <span class="n">dag</span><span class="o">.</span><span class="n">resolve_template_files</span><span class="p">()</span> - <span class="n">dag</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> + <span class="n">dag</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span> <span class="n">settings</span><span class="o">.</span><span class="n">policy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span> @@ -531,7 +552,7 @@ <span class="n">subdag</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="n">dag</span> <span class="n">subdag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="kc">True</span> <span class="bp">self</span><span class="o">.</span><span class="n">bag_dag</span><span class="p">(</span><span class="n">subdag</span><span class="p">,</span> <span class="n">parent_dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> <span class="n">root_dag</span><span class="o">=</span><span class="n">root_dag</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Loaded DAG </span><span class="si">{dag}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span></div> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Loaded DAG </span><span class="si">{dag}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span></div> <div class="viewcode-block" id="DagBag.collect_dags"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.collect_dags">[docs]</a> <span class="k">def</span> <span class="nf">collect_dags</span><span class="p">(</span> <span class="bp">self</span><span class="p">,</span> @@ -546,7 +567,7 @@ <span class="sd"> ignoring files that match any of the regex patterns specified</span> <span class="sd"> in the file.</span> <span class="sd"> """</span> - <span class="n">start_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> + <span class="n">start_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_folder</span> <span class="c1"># Used to store stats around DagBag processing</span> @@ -574,11 +595,11 @@ <span class="k">continue</span> <span class="k">if</span> <span class="ow">not</span> <span class="nb">any</span><span class="p">(</span> <span class="p">[</span><span class="n">re</span><span class="o">.</span><span class="n">findall</span><span class="p">(</span><span class="n">p</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="n">patterns</span><span class="p">]):</span> - <span class="n">ts</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> + <span class="n">ts</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="n">found_dags</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_file</span><span class="p">(</span> <span class="n">filepath</span><span class="p">,</span> <span class="n">only_if_updated</span><span class="o">=</span><span class="n">only_if_updated</span><span class="p">)</span> - <span class="n">td</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">-</span> <span class="n">ts</span> + <span class="n">td</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">ts</span> <span class="n">td</span> <span class="o">=</span> <span class="n">td</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">+</span> <span class="p">(</span> <span class="nb">float</span><span class="p">(</span><span class="n">td</span><span class="o">.</span><span class="n">microseconds</span><span class="p">)</span> <span class="o">/</span> <span class="mi">1000000</span><span class="p">)</span> <span class="n">stats</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">FileLoadStat</span><span class="p">(</span> @@ -589,9 +610,9 @@ <span class="nb">str</span><span class="p">([</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">found_dags</span><span class="p">]),</span> <span class="p">))</span> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> - <span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> <span class="n">Stats</span><span class="o">.</span><span class="n">gauge</span><span class="p">(</span> - <span class="s1">'collect_dags'</span><span class="p">,</span> <span class="p">(</span><span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">-</span> <span class="n">start_dttm</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">(),</span> <span class="mi">1</span><span class="p">)</span> + <span class="s1">'collect_dags'</span><span class="p">,</span> <span class="p">(</span><span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">start_dttm</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">(),</span> <span class="mi">1</span><span class="p">)</span> <span class="n">Stats</span><span class="o">.</span><span class="n">gauge</span><span class="p">(</span> <span class="s1">'dagbag_size'</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">),</span> <span class="mi">1</span><span class="p">)</span> <span class="n">Stats</span><span class="o">.</span><span class="n">gauge</span><span class="p">(</span> @@ -632,7 +653,7 @@ <span class="k">def</span> <span class="nf">paused_dags</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> <span class="n">dag_ids</span> <span class="o">=</span> <span class="p">[</span><span class="n">dp</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">dp</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> - <span class="n">DagModel</span><span class="o">.</span><span class="n">is_paused</span><span class="o">.</span><span class="n">is_</span><span class="p">(</span><span class="kc">True</span><span class="p">))]</span> + <span class="n">DagModel</span><span class="o">.</span><span class="n">is_paused</span><span class="o">.</span><span class="fm">__eq__</span><span class="p">(</span><span class="kc">True</span><span class="p">))]</span> <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> <span class="k">return</span> <span class="n">dag_ids</span></div> @@ -656,7 +677,7 @@ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">superuser</span> -<div class="viewcode-block" id="Connection"><a class="viewcode-back" href="../../code.html#airflow.models.Connection">[docs]</a><span class="k">class</span> <span class="nc">Connection</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span> +<div class="viewcode-block" id="Connection"><a class="viewcode-back" href="../../code.html#airflow.models.Connection">[docs]</a><span class="k">class</span> <span class="nc">Connection</span><span class="p">(</span><span class="n">Base</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span> <span class="sd">"""</span> <span class="sd"> Placeholder to store information about different database instances</span> <span class="sd"> connection information. The idea here is that scripts use references to</span> @@ -678,6 +699,7 @@ <span class="n">_extra</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="s1">'extra'</span><span class="p">,</span> <span class="n">String</span><span class="p">(</span><span class="mi">5000</span><span class="p">))</span> <span class="n">_types</span> <span class="o">=</span> <span class="p">[</span> + <span class="p">(</span><span class="s1">'docker'</span><span class="p">,</span> <span class="s1">'Docker Registry'</span><span class="p">,),</span> <span class="p">(</span><span class="s1">'fs'</span><span class="p">,</span> <span class="s1">'File (path)'</span><span class="p">),</span> <span class="p">(</span><span class="s1">'ftp'</span><span class="p">,</span> <span class="s1">'FTP'</span><span class="p">,),</span> <span class="p">(</span><span class="s1">'google_cloud_platform'</span><span class="p">,</span> <span class="s1">'Google Cloud Platform'</span><span class="p">),</span> @@ -700,6 +722,11 @@ <span class="p">(</span><span class="s1">'mssql'</span><span class="p">,</span> <span class="s1">'Microsoft SQL Server'</span><span class="p">),</span> <span class="p">(</span><span class="s1">'mesos_framework-id'</span><span class="p">,</span> <span class="s1">'Mesos Framework ID'</span><span class="p">),</span> <span class="p">(</span><span class="s1">'jira'</span><span class="p">,</span> <span class="s1">'JIRA'</span><span class="p">,),</span> + <span class="p">(</span><span class="s1">'redis'</span><span class="p">,</span> <span class="s1">'Redis'</span><span class="p">,),</span> + <span class="p">(</span><span class="s1">'wasb'</span><span class="p">,</span> <span class="s1">'Azure Blob Storage'</span><span class="p">),</span> + <span class="p">(</span><span class="s1">'databricks'</span><span class="p">,</span> <span class="s1">'Databricks'</span><span class="p">,),</span> + <span class="p">(</span><span class="s1">'aws'</span><span class="p">,</span> <span class="s1">'Amazon Web Services'</span><span class="p">,),</span> + <span class="p">(</span><span class="s1">'emr'</span><span class="p">,</span> <span class="s1">'Elastic MapReduce'</span><span class="p">,),</span> <span class="p">]</span> <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> @@ -736,51 +763,61 @@ <span class="k">def</span> <span class="nf">get_password</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span><span class="p">:</span> - <span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span> + <span class="k">try</span><span class="p">:</span> + <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span> + <span class="k">except</span><span class="p">:</span> <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> <span class="s2">"Can't decrypt encrypted password for login=</span><span class="si">{}</span><span class="s2">, </span><span class="se">\</span> <span class="s2"> FERNET_KEY configuration is missing"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span> - <span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_password</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> + <span class="k">return</span> <span class="n">fernet</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_password</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> <span class="k">else</span><span class="p">:</span> <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="k">def</span> <span class="nf">set_password</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> <span class="k">if</span> <span class="n">value</span><span class="p">:</span> <span class="k">try</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="o">=</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> + <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="o">=</span> <span class="n">fernet</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span> <span class="o">=</span> <span class="kc">True</span> - <span class="k">except</span> <span class="ne">NameError</span><span class="p">:</span> + <span class="k">except</span> <span class="n">AirflowException</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to load fernet while encrypting value, "</span> + <span class="s2">"using non-encrypted value."</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="o">=</span> <span class="n">value</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span> <span class="o">=</span> <span class="kc">False</span> <span class="nd">@declared_attr</span> - <span class="k">def</span> <span class="nf">password</span><span class="p">(</span><span class="n">cls</span><span class="p">):</span> + <span class="k">def</span> <span class="nf">password</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span> <span class="k">return</span> <span class="n">synonym</span><span class="p">(</span><span class="s1">'_password'</span><span class="p">,</span> - <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="n">cls</span><span class="o">.</span><span class="n">get_password</span><span class="p">,</span> <span class="n">cls</span><span class="o">.</span><span class="n">set_password</span><span class="p">))</span> + <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">get_password</span><span class="p">,</span> <span class="bp">cls</span><span class="o">.</span><span class="n">set_password</span><span class="p">))</span> <span class="k">def</span> <span class="nf">get_extra</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span><span class="p">:</span> - <span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span> + <span class="k">try</span><span class="p">:</span> + <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span> + <span class="k">except</span><span class="p">:</span> <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> <span class="s2">"Can't decrypt `extra` params for login=</span><span class="si">{}</span><span class="s2">,</span><span class="se">\</span> <span class="s2"> FERNET_KEY configuration is missing"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span> - <span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_extra</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> + <span class="k">return</span> <span class="n">fernet</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_extra</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> <span class="k">else</span><span class="p">:</span> <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="k">def</span> <span class="nf">set_extra</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> <span class="k">if</span> <span class="n">value</span><span class="p">:</span> <span class="k">try</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="o">=</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> + <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="o">=</span> <span class="n">fernet</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span> <span class="o">=</span> <span class="kc">True</span> - <span class="k">except</span> <span class="ne">NameError</span><span class="p">:</span> + <span class="k">except</span> <span class="n">AirflowException</span><span class="p">:</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to load fernet while encrypting value, "</span> + <span class="s2">"using non-encrypted value."</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="o">=</span> <span class="n">value</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span> <span class="o">=</span> <span class="kc">False</span> <span class="nd">@declared_attr</span> - <span class="k">def</span> <span class="nf">extra</span><span class="p">(</span><span class="n">cls</span><span class="p">):</span> + <span class="k">def</span> <span class="nf">extra</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span> <span class="k">return</span> <span class="n">synonym</span><span class="p">(</span><span class="s1">'_extra'</span><span class="p">,</span> - <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="n">cls</span><span class="o">.</span><span class="n">get_extra</span><span class="p">,</span> <span class="n">cls</span><span class="o">.</span><span class="n">set_extra</span><span class="p">))</span> + <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">get_extra</span><span class="p">,</span> <span class="bp">cls</span><span class="o">.</span><span class="n">set_extra</span><span class="p">))</span> <span class="k">def</span> <span class="nf">get_hook</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="k">try</span><span class="p">:</span> @@ -823,6 +860,15 @@ <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">'jira'</span><span class="p">:</span> <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.jira_hook</span> <span class="k">import</span> <span class="n">JiraHook</span> <span class="k">return</span> <span class="n">JiraHook</span><span class="p">(</span><span class="n">jira_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span> + <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">'redis'</span><span class="p">:</span> + <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.redis_hook</span> <span class="k">import</span> <span class="n">RedisHook</span> + <span class="k">return</span> <span class="n">RedisHook</span><span class="p">(</span><span class="n">redis_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span> + <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">'wasb'</span><span class="p">:</span> + <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.wasb_hook</span> <span class="k">import</span> <span class="n">WasbHook</span> + <span class="k">return</span> <span class="n">WasbHook</span><span class="p">(</span><span class="n">wasb_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span> + <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">'docker'</span><span class="p">:</span> + <span class="kn">from</span> <span class="nn">airflow.hooks.docker_hook</span> <span class="k">import</span> <span class="n">DockerHook</span> + <span class="k">return</span> <span class="n">DockerHook</span><span class="p">(</span><span class="n">docker_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span> <span class="k">except</span><span class="p">:</span> <span class="k">pass</span> @@ -837,8 +883,8 @@ <span class="k">try</span><span class="p">:</span> <span class="n">obj</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">extra</span><span class="p">)</span> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> - <span class="n">logging</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> - <span class="n">logging</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Failed parsing the json for conn_id </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> + <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Failed parsing the json for conn_id </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span> <span class="k">return</span> <span class="n">obj</span></div> @@ -869,13 +915,13 @@ <span class="bp">self</span><span class="o">.</span><span class="n">pickle</span> <span class="o">=</span> <span class="n">dag</span> -<div class="viewcode-block" id="TaskInstance"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance">[docs]</a><span class="k">class</span> <span class="nc">TaskInstance</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span> +<div class="viewcode-block" id="TaskInstance"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance">[docs]</a><span class="k">class</span> <span class="nc">TaskInstance</span><span class="p">(</span><span class="n">Base</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span> <span class="sd">"""</span> <span class="sd"> Task instances store the state of a task instance. This table is the</span> <span class="sd"> authority and single source of truth around what tasks have run and the</span> <span class="sd"> state they are in.</span> -<span class="sd"> The SqlAchemy model doesn't have a SqlAlchemy foreign key to the task or</span> +<span class="sd"> The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or</span> <span class="sd"> dag model deliberately to have more control over transactions.</span> <span class="sd"> Database transactions on this table should insure double triggers and</span> @@ -892,7 +938,8 @@ <span class="n">end_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span> <span class="n">duration</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Float</span><span class="p">)</span> <span class="n">state</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">20</span><span class="p">))</span> - <span class="n">try_number</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span> + <span class="n">_try_number</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="s1">'try_number'</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span> + <span class="n">max_tries</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span> <span class="n">hostname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span> <span class="n">unixname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span> <span class="n">job_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span> @@ -919,18 +966,42 @@ <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span> <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">priority_weight_total</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">=</span> <span class="mi">0</span> + <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="bp">self</span><span class="o">.</span><span class="n">unixname</span> <span class="o">=</span> <span class="n">getpass</span><span class="o">.</span><span class="n">getuser</span><span class="p">()</span> <span class="bp">self</span><span class="o">.</span><span class="n">run_as_user</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">run_as_user</span> <span class="k">if</span> <span class="n">state</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span> <span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="s1">''</span> <span class="bp">self</span><span class="o">.</span><span class="n">init_on_load</span><span class="p">()</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">"airflow.task"</span><span class="p">)</span> - <span class="nd">@reconstructor</span> -<div class="viewcode-block" id="TaskInstance.init_on_load"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.init_on_load">[docs]</a> <span class="k">def</span> <span class="nf">init_on_load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> +<div class="viewcode-block" id="TaskInstance.init_on_load"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.init_on_load">[docs]</a> <span class="nd">@reconstructor</span> + <span class="k">def</span> <span class="nf">init_on_load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="sd">""" Initialize the attributes that aren't stored in the DB. """</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="kc">False</span> <span class="c1"># can be changed when calling 'run'</span></div> + <span class="nd">@property</span> + <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="sd">"""</span> +<span class="sd"> Return the try number that this task number will be when it is acutally</span> +<span class="sd"> run.</span> + +<span class="sd"> If the TI is currently running, this will match the column in the</span> +<span class="sd"> databse, in all othercases this will be incremenetd</span> +<span class="sd"> """</span> + <span class="c1"># This is designed so that task logs end up in the right file.</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span> + + <span class="nd">@try_number</span><span class="o">.</span><span class="n">setter</span> + <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> + <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">=</span> <span class="n">value</span> + + <span class="nd">@property</span> + <span class="k">def</span> <span class="nf">next_try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span> + <div class="viewcode-block" id="TaskInstance.command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.command">[docs]</a> <span class="k">def</span> <span class="nf">command</span><span class="p">(</span> <span class="bp">self</span><span class="p">,</span> <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> @@ -1007,8 +1078,8 @@ <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> <span class="n">cfg_path</span><span class="o">=</span><span class="n">cfg_path</span><span class="p">)</span></div> - <span class="nd">@staticmethod</span> -<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.generate_command">[docs]</a> <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> +<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.generate_command">[docs]</a> <span class="nd">@staticmethod</span> + <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">
<TRUNCATED>
