This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fef6a4783 Soft delete datasets that are no longer referenced in DAG 
schedules or task outlets (#27828)
3fef6a4783 is described below

commit 3fef6a47834b89b99523db6d97d6aa530657a008
Author: blag <[email protected]>
AuthorDate: Fri Nov 25 09:22:51 2022 -0800

    Soft delete datasets that are no longer referenced in DAG schedules or task 
outlets (#27828)
    
    * Soft delete datasets that are no longer referenced anywhere
    
    * Move the orphaning logic into the scheduler and adjust config option name 
accordingly
    
    * Rename config option scheduler.cleanup_interval -> 
scheduler.parsing_cleanup_interval
    
    * Include default column value in migration
    
    * deprecate old interval config; move code to scheduler_job
    
    * First pass at a test
    
    * Fix migration
    
    * Apply suggestions from code review
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * Don't batch migrations if we don't need to
    
    * Revert "Don't batch migrations if we don't need to" - gotta batch 
migrations for SQLite
    
    This reverts commit 652f7452d3b418c991d409a2b0fc041443048545.
    
    * Tweak migrations
    
    * Use sqlalchemy.sql.True_() to support all DB backends
    
    * Various cleanups
    
    * Add test for un-orphaning datasets once they are referenced again
    
    * Use sqlalchemy.sql.expression.true()
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
    
    * Fix orphaning datasets on MSSQL
    
    * Comment the un-orphan process and use sqla.sql.expression.false()
    
    * Add newsfragment about renamed config option
    
    * add mssql_drop_default flag
    
    * Use server_default in the ORM as well
    
    * Defensively clear datasets before and after DAG tests
    
    * Reconcile migration with ORM model
    
    * Remove now erroneous comment
    
    * Change to use server_default='0'
    
    * Update airflow/configuration.py
    
    Co-authored-by: Jed Cunningham <[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 airflow/config_templates/config.yml                |   7 +-
 airflow/config_templates/default_airflow.cfg       |   5 +-
 airflow/configuration.py                           |   1 +
 airflow/dag_processing/manager.py                  |   4 +-
 airflow/jobs/scheduler_job.py                      |  47 ++-
 .../0122_2_5_0_add_is_orphaned_to_datasetmodel.py  |  57 +++
 airflow/models/dag.py                              |   4 +
 airflow/models/dataset.py                          |   2 +
 airflow/www/views.py                               |   4 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/img/airflow_erd.svg            | 406 +++++++++++----------
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 newsfragments/27828.significant.rst                |   3 +
 tests/jobs/test_scheduler_job.py                   |  40 ++
 tests/models/test_dag.py                           |  51 +++
 15 files changed, 422 insertions(+), 215 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index eef9fa0381..8248040c1c 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1946,11 +1946,12 @@
       type: string
       example: ~
       default: "30"
-    - name: deactivate_stale_dags_interval
+    - name: parsing_cleanup_interval
       description: |
         How often (in seconds) to check for stale DAGs (DAGs which are no 
longer present in
-        the expected files) which should be deactivated.
-      version_added: 2.2.5
+        the expected files) which should be deactivated, as well as datasets 
that are no longer
+        referenced and should be marked as orphaned.
+      version_added: 2.5.0
       type: integer
       example: ~
       default: "60"
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index d1f7069cbb..9532bada04 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -985,8 +985,9 @@ scheduler_idle_sleep_time = 1
 min_file_process_interval = 30
 
 # How often (in seconds) to check for stale DAGs (DAGs which are no longer 
present in
-# the expected files) which should be deactivated.
-deactivate_stale_dags_interval = 60
+# the expected files) which should be deactivated, as well as datasets that 
are no longer
+# referenced and should be marked as orphaned.
+parsing_cleanup_interval = 60
 
 # How often (in seconds) to scan the DAGs directory for new files. Default to 
5 minutes.
 dag_dir_list_interval = 300
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 32aa151b4d..fdd11758f3 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -255,6 +255,7 @@ class AirflowConfigParser(ConfigParser):
                 "worker_pods_pending_timeout_batch_size",
             )
         },
+        ("scheduler", "parsing_cleanup_interval"): ("scheduler", 
"deactivate_stale_dags_interval", "2.5.0"),
     }
 
     # A mapping of old default values that we want to change and warn the user
diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 23b98a921d..d1450b7ed9 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -434,7 +434,7 @@ class DagFileProcessorManager(LoggingMixin):
         # Last time we cleaned up DAGs which are no longer in files
         self.last_deactivate_stale_dags_time = 
timezone.make_aware(datetime.fromtimestamp(0))
         # How often to check for DAGs which are no longer in files
-        self.deactivate_stale_dags_interval = conf.getint("scheduler", 
"deactivate_stale_dags_interval")
+        self.parsing_cleanup_interval = conf.getint("scheduler", 
"parsing_cleanup_interval")
         # How long to wait before timing out a process to parse a DAG file
         self._processor_timeout = processor_timeout
         # How often to scan the DAGs directory for new files. Default to 5 
minutes.
@@ -497,7 +497,7 @@ class DagFileProcessorManager(LoggingMixin):
         """
         now = timezone.utcnow()
         elapsed_time_since_refresh = (now - 
self.last_deactivate_stale_dags_time).total_seconds()
-        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+        if elapsed_time_since_refresh > self.parsing_cleanup_interval:
             last_parsed = {
                 fp: self.get_last_finish_time(fp) for fp in self.file_paths if 
self.get_last_finish_time(fp)
             }
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 7e3e4d6496..623bb948dc 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -30,7 +30,7 @@ from datetime import datetime, timedelta
 from pathlib import Path
 from typing import TYPE_CHECKING, Collection, DefaultDict, Iterator
 
-from sqlalchemy import func, not_, or_, text
+from sqlalchemy import and_, func, not_, or_, text
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import load_only, selectinload
 from sqlalchemy.orm.session import Session, make_transient
@@ -46,7 +46,13 @@ from airflow.jobs.base_job import BaseJob
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun
-from airflow.models.dataset import DagScheduleDatasetReference, 
DatasetDagRunQueue, DatasetEvent
+from airflow.models.dataset import (
+    DagScheduleDatasetReference,
+    DatasetDagRunQueue,
+    DatasetEvent,
+    DatasetModel,
+    TaskOutletDatasetReference,
+)
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, 
TaskInstanceKey
 from airflow.stats import Stats
@@ -854,9 +860,14 @@ class SchedulerJob(BaseJob):
         )
         timers.call_regular_interval(60.0, 
self._update_dag_run_state_for_paused_dags)
 
+        timers.call_regular_interval(
+            conf.getfloat("scheduler", "parsing_cleanup_interval"),
+            self._orphan_unreferenced_datasets,
+        )
+
         if self._standalone_dag_processor:
             timers.call_regular_interval(
-                conf.getfloat("scheduler", "deactivate_stale_dags_interval", 
fallback=60.0),
+                conf.getfloat("scheduler", "parsing_cleanup_interval"),
                 self._cleanup_stale_dags,
             )
 
@@ -1574,3 +1585,33 @@ class SchedulerJob(BaseJob):
             dag.is_active = False
             SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
         session.flush()
+
+    @provide_session
+    def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Detects datasets that are no longer referenced in any DAG schedule 
parameters or task outlets and
+        sets the dataset is_orphaned flag to True
+        """
+        orphaned_dataset_query = (
+            session.query(DatasetModel)
+            .join(
+                DagScheduleDatasetReference,
+                isouter=True,
+            )
+            .join(
+                TaskOutletDatasetReference,
+                isouter=True,
+            )
+            # MSSQL doesn't like it when we select a column that we haven't 
grouped by. All other DBs let us
+            # group by id and select all columns.
+            .group_by(DatasetModel if session.get_bind().dialect.name == 
"mssql" else DatasetModel.id)
+            .having(
+                and_(
+                    func.count(DagScheduleDatasetReference.dag_id) == 0,
+                    func.count(TaskOutletDatasetReference.dag_id) == 0,
+                )
+            )
+        )
+        for dataset in orphaned_dataset_query:
+            self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
+            dataset.is_orphaned = expression.true()
diff --git 
a/airflow/migrations/versions/0122_2_5_0_add_is_orphaned_to_datasetmodel.py 
b/airflow/migrations/versions/0122_2_5_0_add_is_orphaned_to_datasetmodel.py
new file mode 100644
index 0000000000..f4355402c8
--- /dev/null
+++ b/airflow/migrations/versions/0122_2_5_0_add_is_orphaned_to_datasetmodel.py
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add is_orphaned to DatasetModel
+
+Revision ID: 290244fb8b83
+Revises: 1986afd32c1b
+Create Date: 2022-11-22 00:12:53.432961
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "290244fb8b83"
+down_revision = "1986afd32c1b"
+branch_labels = None
+depends_on = None
+airflow_version = "2.5.0"
+
+
+def upgrade():
+    """Add is_orphaned to DatasetModel"""
+    with op.batch_alter_table("dataset") as batch_op:
+        batch_op.add_column(
+            sa.Column(
+                "is_orphaned",
+                sa.Boolean,
+                default=False,
+                nullable=False,
+                server_default="0",
+            )
+        )
+
+
+def downgrade():
+    """Remove is_orphaned from DatasetModel"""
+    with op.batch_alter_table("dataset") as batch_op:
+        batch_op.drop_column("is_orphaned", mssql_drop_default=True)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index bd81980fc2..c761dcadfa 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2825,6 +2825,10 @@ class DAG(LoggingMixin):
         for dataset in all_datasets:
             stored_dataset = 
session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).first()
             if stored_dataset:
+                # Some datasets may have been previously unreferenced, and 
therefore orphaned by the
+                # scheduler. But if we're here, then we have found that 
dataset again in our DAGs, which
+                # means that it is no longer an orphan, so set is_orphaned to 
False.
+                stored_dataset.is_orphaned = expression.false()
                 stored_datasets[stored_dataset.uri] = stored_dataset
             else:
                 session.add(dataset)
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index e52d5c756f..4cd370386c 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -21,6 +21,7 @@ from urllib.parse import urlsplit
 
 import sqlalchemy_jsonfield
 from sqlalchemy import (
+    Boolean,
     Column,
     ForeignKey,
     ForeignKeyConstraint,
@@ -64,6 +65,7 @@ class DatasetModel(Base):
     extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, 
default={})
     created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
     updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow, nullable=False)
+    is_orphaned = Column(Boolean, default=False, nullable=False, 
server_default="0")
 
     consuming_dags = relationship("DagScheduleDatasetReference", 
back_populates="dataset")
     producing_tasks = relationship("TaskOutletDatasetReference", 
back_populates="dataset")
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f3f84bb851..39f69d4410 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3565,7 +3565,7 @@ class Airflow(AirflowBaseView):
                     ),
                     isouter=True,
                 )
-                .filter(DagScheduleDatasetReference.dag_id == dag_id)
+                .filter(DagScheduleDatasetReference.dag_id == dag_id, 
~DatasetModel.is_orphaned)
                 .group_by(DatasetModel.id, DatasetModel.uri)
                 .order_by(DatasetModel.uri)
                 .all()
@@ -3688,7 +3688,7 @@ class Airflow(AirflowBaseView):
             if has_event_filters:
                 count_query = count_query.join(DatasetEvent, 
DatasetEvent.dataset_id == DatasetModel.id)
 
-            filters = []
+            filters = [~DatasetModel.is_orphaned]
             if uri_pattern:
                 filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
             if updated_after:
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index d2240af1eb..99324d07fb 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-f529521071a6c9ae8bbd58d63cf1195fc1ec964308e7684569d0a36d26534def
\ No newline at end of file
+5bee32cf7239656360cfe7978e7f01d6ee3b9914d3f3fd89ed4ff747254dddf8
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index a348a579c4..61ed186741 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -4,11 +4,11 @@
 <!-- Generated by graphviz version 2.43.0 (0)
  -->
 <!-- Title: %3 Pages: 1 -->
-<svg width="1530pt" height="5940pt"
- viewBox="0.00 0.00 1530.00 5939.50" xmlns="http://www.w3.org/2000/svg"; 
xmlns:xlink="http://www.w3.org/1999/xlink";>
-<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 
5935.5)">
+<svg width="1530pt" height="5965pt"
+ viewBox="0.00 0.00 1530.00 5964.50" xmlns="http://www.w3.org/2000/svg"; 
xmlns:xlink="http://www.w3.org/1999/xlink";>
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 
5960.5)">
 <title>%3</title>
-<polygon fill="white" stroke="transparent" points="-4,4 -4,-5935.5 
1526,-5935.5 1526,4 -4,4"/>
+<polygon fill="white" stroke="transparent" points="-4,4 -4,-5960.5 
1526,-5960.5 1526,4 -4,4"/>
 <!-- ab_permission -->
 <g id="node1" class="node">
 <title>ab_permission</title>
@@ -650,43 +650,43 @@
 <!-- dag_code -->
 <g id="node21" class="node">
 <title>dag_code</title>
-<polygon fill="none" stroke="black" points="55.5,-4036 55.5,-4064 327.5,-4064 
327.5,-4036 55.5,-4036"/>
-<text text-anchor="start" x="149" y="-4047.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_code</text>
+<polygon fill="none" stroke="black" points="55.5,-4061 55.5,-4089 327.5,-4089 
327.5,-4061 55.5,-4061"/>
+<text text-anchor="start" x="149" y="-4072.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_code</text>
+<polygon fill="none" stroke="black" points="55.5,-4036 55.5,-4061 327.5,-4061 
327.5,-4036 55.5,-4036"/>
+<text text-anchor="start" x="60.5" y="-4045.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">fileloc_hash</text>
+<text text-anchor="start" x="142.5" y="-4045.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
+<text text-anchor="start" x="206.5" y="-4045.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-4011 55.5,-4036 327.5,-4036 
327.5,-4011 55.5,-4011"/>
-<text text-anchor="start" x="60.5" y="-4020.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">fileloc_hash</text>
-<text text-anchor="start" x="142.5" y="-4020.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
-<text text-anchor="start" x="206.5" y="-4020.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-4020.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc</text>
+<text text-anchor="start" x="101.5" y="-4020.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<text text-anchor="start" x="231.5" y="-4020.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-3986 55.5,-4011 327.5,-4011 
327.5,-3986 55.5,-3986"/>
-<text text-anchor="start" x="60.5" y="-3995.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc</text>
-<text text-anchor="start" x="101.5" y="-3995.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
-<text text-anchor="start" x="231.5" y="-3995.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-3995.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">last_updated</text>
+<text text-anchor="start" x="150.5" y="-3995.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="246.5" y="-3995.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-3961 55.5,-3986 327.5,-3986 
327.5,-3961 55.5,-3961"/>
-<text text-anchor="start" x="60.5" y="-3970.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">last_updated</text>
-<text text-anchor="start" x="150.5" y="-3970.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="246.5" y="-3970.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="55.5,-3936 55.5,-3961 327.5,-3961 
327.5,-3936 55.5,-3936"/>
-<text text-anchor="start" x="60.5" y="-3945.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">source_code</text>
-<text text-anchor="start" x="146.5" y="-3945.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
-<text text-anchor="start" x="196.5" y="-3945.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-3970.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">source_code</text>
+<text text-anchor="start" x="146.5" y="-3970.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="196.5" y="-3970.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- dag_pickle -->
 <g id="node22" class="node">
 <title>dag_pickle</title>
-<polygon fill="none" stroke="black" points="92.5,-4190 92.5,-4218 290.5,-4218 
290.5,-4190 92.5,-4190"/>
-<text text-anchor="start" x="144.5" y="-4201.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_pickle</text>
+<polygon fill="none" stroke="black" points="92.5,-4215 92.5,-4243 290.5,-4243 
290.5,-4215 92.5,-4215"/>
+<text text-anchor="start" x="144.5" y="-4226.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dag_pickle</text>
+<polygon fill="none" stroke="black" points="92.5,-4190 92.5,-4215 290.5,-4215 
290.5,-4190 92.5,-4190"/>
+<text text-anchor="start" x="97.5" y="-4199.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="110.5" y="-4199.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="187.5" y="-4199.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="92.5,-4165 92.5,-4190 290.5,-4190 
290.5,-4165 92.5,-4165"/>
-<text text-anchor="start" x="97.5" y="-4174.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="110.5" y="-4174.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="187.5" y="-4174.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="97.5" y="-4174.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_dttm</text>
+<text text-anchor="start" x="189.5" y="-4174.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="92.5,-4140 92.5,-4165 290.5,-4165 
290.5,-4140 92.5,-4140"/>
-<text text-anchor="start" x="97.5" y="-4149.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_dttm</text>
-<text text-anchor="start" x="189.5" y="-4149.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="97.5" y="-4149.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pickle</text>
+<text text-anchor="start" x="138.5" y="-4149.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BLOB]</text>
 <polygon fill="none" stroke="black" points="92.5,-4115 92.5,-4140 290.5,-4140 
290.5,-4115 92.5,-4115"/>
-<text text-anchor="start" x="97.5" y="-4124.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pickle</text>
-<text text-anchor="start" x="138.5" y="-4124.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BLOB]</text>
-<polygon fill="none" stroke="black" points="92.5,-4090 92.5,-4115 290.5,-4115 
290.5,-4090 92.5,-4090"/>
-<text text-anchor="start" x="97.5" y="-4099.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pickle_hash</text>
-<text text-anchor="start" x="179.5" y="-4099.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
+<text text-anchor="start" x="97.5" y="-4124.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pickle_hash</text>
+<text text-anchor="start" x="179.5" y="-4124.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
 </g>
 <!-- dag_run -->
 <g id="node23" class="node">
@@ -1281,49 +1281,53 @@
 <!-- dataset -->
 <g id="node28" class="node">
 <title>dataset</title>
-<polygon fill="none" stroke="black" points="61.5,-3881 61.5,-3909 322.5,-3909 
322.5,-3881 61.5,-3881"/>
-<text text-anchor="start" x="158" y="-3892.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dataset</text>
-<polygon fill="none" stroke="black" points="61.5,-3856 61.5,-3881 322.5,-3881 
322.5,-3856 61.5,-3856"/>
-<text text-anchor="start" x="66.5" y="-3865.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="79.5" y="-3865.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="156.5" y="-3865.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="61.5,-3831 61.5,-3856 322.5,-3856 
322.5,-3831 61.5,-3831"/>
-<text text-anchor="start" x="66.5" y="-3840.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
-<text text-anchor="start" x="139.5" y="-3840.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="235.5" y="-3840.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="61.5,-3806 61.5,-3831 322.5,-3831 
322.5,-3806 61.5,-3806"/>
-<text text-anchor="start" x="66.5" y="-3815.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">extra</text>
-<text text-anchor="start" x="103.5" y="-3815.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
-<text text-anchor="start" x="154.5" y="-3815.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="61.5,-3781 61.5,-3806 322.5,-3806 
322.5,-3781 61.5,-3781"/>
-<text text-anchor="start" x="66.5" y="-3790.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">updated_at</text>
-<text text-anchor="start" x="145.5" y="-3790.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="241.5" y="-3790.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="61.5,-3756 61.5,-3781 322.5,-3781 
322.5,-3756 61.5,-3756"/>
-<text text-anchor="start" x="66.5" y="-3765.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">uri</text>
-<text text-anchor="start" x="85.5" y="-3765.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(3000)]</text>
-<text text-anchor="start" x="215.5" y="-3765.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="61.5,-3907 61.5,-3935 322.5,-3935 
322.5,-3907 61.5,-3907"/>
+<text text-anchor="start" x="158" y="-3918.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">dataset</text>
+<polygon fill="none" stroke="black" points="61.5,-3882 61.5,-3907 322.5,-3907 
322.5,-3882 61.5,-3882"/>
+<text text-anchor="start" x="66.5" y="-3891.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="79.5" y="-3891.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="156.5" y="-3891.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="61.5,-3857 61.5,-3882 322.5,-3882 
322.5,-3857 61.5,-3857"/>
+<text text-anchor="start" x="66.5" y="-3866.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">created_at</text>
+<text text-anchor="start" x="139.5" y="-3866.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="235.5" y="-3866.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="61.5,-3832 61.5,-3857 322.5,-3857 
322.5,-3832 61.5,-3832"/>
+<text text-anchor="start" x="66.5" y="-3841.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">extra</text>
+<text text-anchor="start" x="103.5" y="-3841.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<text text-anchor="start" x="154.5" y="-3841.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="61.5,-3807 61.5,-3832 322.5,-3832 
322.5,-3807 61.5,-3807"/>
+<text text-anchor="start" x="66.5" y="-3816.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">is_orphaned</text>
+<text text-anchor="start" x="151.5" y="-3816.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
+<text text-anchor="start" x="235.5" y="-3816.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="61.5,-3782 61.5,-3807 322.5,-3807 
322.5,-3782 61.5,-3782"/>
+<text text-anchor="start" x="66.5" y="-3791.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">updated_at</text>
+<text text-anchor="start" x="145.5" y="-3791.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="241.5" y="-3791.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="61.5,-3757 61.5,-3782 322.5,-3782 
322.5,-3757 61.5,-3757"/>
+<text text-anchor="start" x="66.5" y="-3766.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">uri</text>
+<text text-anchor="start" x="85.5" y="-3766.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(3000)]</text>
+<text text-anchor="start" x="215.5" y="-3766.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 </g>
 <!-- dataset&#45;&#45;dag_schedule_dataset_reference -->
 <g id="edge24" class="edge">
 <title>dataset&#45;&#45;dag_schedule_dataset_reference</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M330.22,-3793.79C370.33,-3782.37 414.46,-3769.81 455.73,-3758.06"/>
-<text text-anchor="start" x="424.73" y="-3746.86" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="330.22" y="-3782.59" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M330.22,-3802.54C370.33,-3789.89 414.46,-3775.96 455.73,-3762.94"/>
+<text text-anchor="start" x="424.73" y="-3751.74" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="330.22" y="-3791.34" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- dataset&#45;&#45;dataset_dag_run_queue -->
 <g id="edge25" class="edge">
 <title>dataset&#45;&#45;dataset_dag_run_queue</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M330.22,-3840.19C370.57,-3842.29 414.99,-3844.61 456.47,-3846.78"/>
-<text text-anchor="start" x="425.47" y="-3835.58" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="330.22" y="-3828.99" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M330.22,-3848.94C370.57,-3849.8 414.99,-3850.75 456.47,-3851.64"/>
+<text text-anchor="start" x="425.47" y="-3840.44" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="330.22" y="-3837.74" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- dataset&#45;&#45;task_outlet_dataset_reference -->
 <g id="edge26" class="edge">
 <title>dataset&#45;&#45;task_outlet_dataset_reference</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M330.19,-3779.77C348.95,-3769.42 367.18,-3757.51 383,-3744 427.06,-3706.38 
413.49,-3675.37 456,-3636 459.47,-3632.78 463.08,-3629.63 466.8,-3626.55"/>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M330.09,-3782.88C348.87,-3771.33 367.12,-3758.32 383,-3744 426.02,-3705.19 
413.49,-3675.37 456,-3636 459.47,-3632.78 463.08,-3629.63 466.8,-3626.55"/>
 <text text-anchor="start" x="435.8" y="-3615.35" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="330.19" y="-3768.57" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
+<text text-anchor="start" x="330.09" y="-3771.68" font-family="Times,serif" 
font-size="14.00">{0,1}</text>
 </g>
 <!-- dataset_event -->
 <g id="node29" class="node">
@@ -1369,92 +1373,92 @@
 <!-- import_error -->
 <g id="node30" class="node">
 <title>import_error</title>
-<polygon fill="none" stroke="black" points="91.5,-4344 91.5,-4372 292.5,-4372 
292.5,-4344 91.5,-4344"/>
-<text text-anchor="start" x="134.5" y="-4355.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">import_error</text>
+<polygon fill="none" stroke="black" points="91.5,-4369 91.5,-4397 292.5,-4397 
292.5,-4369 91.5,-4369"/>
+<text text-anchor="start" x="134.5" y="-4380.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">import_error</text>
+<polygon fill="none" stroke="black" points="91.5,-4344 91.5,-4369 292.5,-4369 
292.5,-4344 91.5,-4344"/>
+<text text-anchor="start" x="96.5" y="-4353.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="109.5" y="-4353.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="186.5" y="-4353.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="91.5,-4319 91.5,-4344 292.5,-4344 
292.5,-4319 91.5,-4319"/>
-<text text-anchor="start" x="96.5" y="-4328.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="109.5" y="-4328.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="186.5" y="-4328.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="96.5" y="-4328.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">filename</text>
+<text text-anchor="start" x="157.5" y="-4328.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1024)]</text>
 <polygon fill="none" stroke="black" points="91.5,-4294 91.5,-4319 292.5,-4319 
292.5,-4294 91.5,-4294"/>
-<text text-anchor="start" x="96.5" y="-4303.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">filename</text>
-<text text-anchor="start" x="157.5" y="-4303.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1024)]</text>
+<text text-anchor="start" x="96.5" y="-4303.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">stacktrace</text>
+<text text-anchor="start" x="169.5" y="-4303.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 <polygon fill="none" stroke="black" points="91.5,-4269 91.5,-4294 292.5,-4294 
292.5,-4269 91.5,-4269"/>
-<text text-anchor="start" x="96.5" y="-4278.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">stacktrace</text>
-<text text-anchor="start" x="169.5" y="-4278.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
-<polygon fill="none" stroke="black" points="91.5,-4244 91.5,-4269 292.5,-4269 
292.5,-4244 91.5,-4244"/>
-<text text-anchor="start" x="96.5" y="-4253.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">timestamp</text>
-<text text-anchor="start" x="171.5" y="-4253.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="96.5" y="-4278.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">timestamp</text>
+<text text-anchor="start" x="171.5" y="-4278.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 </g>
 <!-- job -->
 <g id="node31" class="node">
 <title>job</title>
-<polygon fill="none" stroke="black" points="75.5,-4648 75.5,-4676 308.5,-4676 
308.5,-4648 75.5,-4648"/>
-<text text-anchor="start" x="178" y="-4659.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">job</text>
+<polygon fill="none" stroke="black" points="75.5,-4673 75.5,-4701 308.5,-4701 
308.5,-4673 75.5,-4673"/>
+<text text-anchor="start" x="178" y="-4684.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">job</text>
+<polygon fill="none" stroke="black" points="75.5,-4648 75.5,-4673 308.5,-4673 
308.5,-4648 75.5,-4648"/>
+<text text-anchor="start" x="80.5" y="-4657.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="93.5" y="-4657.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="170.5" y="-4657.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="75.5,-4623 75.5,-4648 308.5,-4648 
308.5,-4623 75.5,-4623"/>
-<text text-anchor="start" x="80.5" y="-4632.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="93.5" y="-4632.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="170.5" y="-4632.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="80.5" y="-4632.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="126.5" y="-4632.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
 <polygon fill="none" stroke="black" points="75.5,-4598 75.5,-4623 308.5,-4623 
308.5,-4598 75.5,-4598"/>
-<text text-anchor="start" x="80.5" y="-4607.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="126.5" y="-4607.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="80.5" y="-4607.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">end_date</text>
+<text text-anchor="start" x="144.5" y="-4607.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="75.5,-4573 75.5,-4598 308.5,-4598 
308.5,-4573 75.5,-4573"/>
-<text text-anchor="start" x="80.5" y="-4582.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">end_date</text>
-<text text-anchor="start" x="144.5" y="-4582.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="80.5" y="-4582.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">executor_class</text>
+<text text-anchor="start" x="182.5" y="-4582.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
 <polygon fill="none" stroke="black" points="75.5,-4548 75.5,-4573 308.5,-4573 
308.5,-4548 75.5,-4548"/>
-<text text-anchor="start" x="80.5" y="-4557.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">executor_class</text>
-<text text-anchor="start" x="182.5" y="-4557.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
+<text text-anchor="start" x="80.5" y="-4557.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">hostname</text>
+<text text-anchor="start" x="150.5" y="-4557.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
 <polygon fill="none" stroke="black" points="75.5,-4523 75.5,-4548 308.5,-4548 
308.5,-4523 75.5,-4523"/>
-<text text-anchor="start" x="80.5" y="-4532.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">hostname</text>
-<text text-anchor="start" x="150.5" y="-4532.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
+<text text-anchor="start" x="80.5" y="-4532.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">job_type</text>
+<text text-anchor="start" x="139.5" y="-4532.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(30)]</text>
 <polygon fill="none" stroke="black" points="75.5,-4498 75.5,-4523 308.5,-4523 
308.5,-4498 75.5,-4498"/>
-<text text-anchor="start" x="80.5" y="-4507.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">job_type</text>
-<text text-anchor="start" x="139.5" y="-4507.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(30)]</text>
+<text text-anchor="start" x="80.5" y="-4507.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">latest_heartbeat</text>
+<text text-anchor="start" x="193.5" y="-4507.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="75.5,-4473 75.5,-4498 308.5,-4498 
308.5,-4473 75.5,-4473"/>
-<text text-anchor="start" x="80.5" y="-4482.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">latest_heartbeat</text>
-<text text-anchor="start" x="193.5" y="-4482.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="80.5" y="-4482.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">start_date</text>
+<text text-anchor="start" x="150.5" y="-4482.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="75.5,-4448 75.5,-4473 308.5,-4473 
308.5,-4448 75.5,-4448"/>
-<text text-anchor="start" x="80.5" y="-4457.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">start_date</text>
-<text text-anchor="start" x="150.5" y="-4457.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="80.5" y="-4457.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">state</text>
+<text text-anchor="start" x="115.5" y="-4457.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
 <polygon fill="none" stroke="black" points="75.5,-4423 75.5,-4448 308.5,-4448 
308.5,-4423 75.5,-4423"/>
-<text text-anchor="start" x="80.5" y="-4432.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">state</text>
-<text text-anchor="start" x="115.5" y="-4432.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
-<polygon fill="none" stroke="black" points="75.5,-4398 75.5,-4423 308.5,-4423 
308.5,-4398 75.5,-4398"/>
-<text text-anchor="start" x="80.5" y="-4407.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">unixname</text>
-<text text-anchor="start" x="150.5" y="-4407.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<text text-anchor="start" x="80.5" y="-4432.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">unixname</text>
+<text text-anchor="start" x="150.5" y="-4432.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
 </g>
 <!-- log -->
 <g id="node32" class="node">
 <title>log</title>
-<polygon fill="none" stroke="black" points="86.5,-4927 86.5,-4955 297.5,-4955 
297.5,-4927 86.5,-4927"/>
-<text text-anchor="start" x="178" y="-4938.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">log</text>
+<polygon fill="none" stroke="black" points="86.5,-4952 86.5,-4980 297.5,-4980 
297.5,-4952 86.5,-4952"/>
+<text text-anchor="start" x="178" y="-4963.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">log</text>
+<polygon fill="none" stroke="black" points="86.5,-4927 86.5,-4952 297.5,-4952 
297.5,-4927 86.5,-4927"/>
+<text text-anchor="start" x="91.5" y="-4936.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="104.5" y="-4936.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="181.5" y="-4936.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="86.5,-4902 86.5,-4927 297.5,-4927 
297.5,-4902 86.5,-4902"/>
-<text text-anchor="start" x="91.5" y="-4911.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="104.5" y="-4911.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="181.5" y="-4911.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="91.5" y="-4911.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="137.5" y="-4911.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
 <polygon fill="none" stroke="black" points="86.5,-4877 86.5,-4902 297.5,-4902 
297.5,-4877 86.5,-4877"/>
-<text text-anchor="start" x="91.5" y="-4886.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="137.5" y="-4886.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="91.5" y="-4886.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dttm</text>
+<text text-anchor="start" x="125.5" y="-4886.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="86.5,-4852 86.5,-4877 297.5,-4877 
297.5,-4852 86.5,-4852"/>
-<text text-anchor="start" x="91.5" y="-4861.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dttm</text>
-<text text-anchor="start" x="125.5" y="-4861.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="91.5" y="-4861.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">event</text>
+<text text-anchor="start" x="131.5" y="-4861.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(30)]</text>
 <polygon fill="none" stroke="black" points="86.5,-4827 86.5,-4852 297.5,-4852 
297.5,-4827 86.5,-4827"/>
-<text text-anchor="start" x="91.5" y="-4836.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">event</text>
-<text text-anchor="start" x="131.5" y="-4836.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(30)]</text>
+<text text-anchor="start" x="91.5" y="-4836.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">execution_date</text>
+<text text-anchor="start" x="196.5" y="-4836.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="86.5,-4802 86.5,-4827 297.5,-4827 
297.5,-4802 86.5,-4802"/>
-<text text-anchor="start" x="91.5" y="-4811.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">execution_date</text>
-<text text-anchor="start" x="196.5" y="-4811.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="91.5" y="-4811.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">extra</text>
+<text text-anchor="start" x="128.5" y="-4811.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 <polygon fill="none" stroke="black" points="86.5,-4777 86.5,-4802 297.5,-4802 
297.5,-4777 86.5,-4777"/>
-<text text-anchor="start" x="91.5" y="-4786.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">extra</text>
-<text text-anchor="start" x="128.5" y="-4786.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="91.5" y="-4786.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
+<text text-anchor="start" x="167.5" y="-4786.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
 <polygon fill="none" stroke="black" points="86.5,-4752 86.5,-4777 297.5,-4777 
297.5,-4752 86.5,-4752"/>
-<text text-anchor="start" x="91.5" y="-4761.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
-<text text-anchor="start" x="167.5" y="-4761.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="91.5" y="-4761.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">owner</text>
+<text text-anchor="start" x="134.5" y="-4761.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
 <polygon fill="none" stroke="black" points="86.5,-4727 86.5,-4752 297.5,-4752 
297.5,-4727 86.5,-4727"/>
-<text text-anchor="start" x="91.5" y="-4736.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">owner</text>
-<text text-anchor="start" x="134.5" y="-4736.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(500)]</text>
-<polygon fill="none" stroke="black" points="86.5,-4702 86.5,-4727 297.5,-4727 
297.5,-4702 86.5,-4702"/>
-<text text-anchor="start" x="91.5" y="-4711.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">task_id</text>
-<text text-anchor="start" x="140.5" y="-4711.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="91.5" y="-4736.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">task_id</text>
+<text text-anchor="start" x="140.5" y="-4736.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
 </g>
 <!-- trigger -->
 <g id="node37" class="node">
@@ -1491,127 +1495,127 @@
 <!-- serialized_dag -->
 <g id="node38" class="node">
 <title>serialized_dag</title>
-<polygon fill="none" stroke="black" points="55.5,-5182 55.5,-5210 327.5,-5210 
327.5,-5182 55.5,-5182"/>
-<text text-anchor="start" x="128" y="-5193.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">serialized_dag</text>
+<polygon fill="none" stroke="black" points="55.5,-5207 55.5,-5235 327.5,-5235 
327.5,-5207 55.5,-5207"/>
+<text text-anchor="start" x="128" y="-5218.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">serialized_dag</text>
+<polygon fill="none" stroke="black" points="55.5,-5182 55.5,-5207 327.5,-5207 
327.5,-5182 55.5,-5182"/>
+<text text-anchor="start" x="60.5" y="-5191.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_id</text>
+<text text-anchor="start" x="106.5" y="-5191.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="227.5" y="-5191.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-5157 55.5,-5182 327.5,-5182 
327.5,-5157 55.5,-5157"/>
-<text text-anchor="start" x="60.5" y="-5166.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_id</text>
-<text text-anchor="start" x="106.5" y="-5166.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="227.5" y="-5166.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-5166.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_hash</text>
+<text text-anchor="start" x="127.5" y="-5166.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(32)]</text>
+<text text-anchor="start" x="239.5" y="-5166.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-5132 55.5,-5157 327.5,-5157 
327.5,-5132 55.5,-5132"/>
-<text text-anchor="start" x="60.5" y="-5141.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_hash</text>
-<text text-anchor="start" x="127.5" y="-5141.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(32)]</text>
-<text text-anchor="start" x="239.5" y="-5141.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-5141.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
+<text text-anchor="start" x="91.5" y="-5141.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
 <polygon fill="none" stroke="black" points="55.5,-5107 55.5,-5132 327.5,-5132 
327.5,-5107 55.5,-5107"/>
-<text text-anchor="start" x="60.5" y="-5116.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
-<text text-anchor="start" x="91.5" y="-5116.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<text text-anchor="start" x="60.5" y="-5116.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data_compressed</text>
+<text text-anchor="start" x="182.5" y="-5116.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BLOB]</text>
 <polygon fill="none" stroke="black" points="55.5,-5082 55.5,-5107 327.5,-5107 
327.5,-5082 55.5,-5082"/>
-<text text-anchor="start" x="60.5" y="-5091.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data_compressed</text>
-<text text-anchor="start" x="182.5" y="-5091.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BLOB]</text>
+<text text-anchor="start" x="60.5" y="-5091.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc</text>
+<text text-anchor="start" x="101.5" y="-5091.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<text text-anchor="start" x="231.5" y="-5091.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-5057 55.5,-5082 327.5,-5082 
327.5,-5057 55.5,-5057"/>
-<text text-anchor="start" x="60.5" y="-5066.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc</text>
-<text text-anchor="start" x="101.5" y="-5066.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
-<text text-anchor="start" x="231.5" y="-5066.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-5066.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc_hash</text>
+<text text-anchor="start" x="142.5" y="-5066.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
+<text text-anchor="start" x="206.5" y="-5066.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-5032 55.5,-5057 327.5,-5057 
327.5,-5032 55.5,-5032"/>
-<text text-anchor="start" x="60.5" y="-5041.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">fileloc_hash</text>
-<text text-anchor="start" x="142.5" y="-5041.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BIGINT]</text>
-<text text-anchor="start" x="206.5" y="-5041.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="60.5" y="-5041.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">last_updated</text>
+<text text-anchor="start" x="150.5" y="-5041.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="246.5" y="-5041.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="55.5,-5007 55.5,-5032 327.5,-5032 
327.5,-5007 55.5,-5007"/>
-<text text-anchor="start" x="60.5" y="-5016.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">last_updated</text>
-<text text-anchor="start" x="150.5" y="-5016.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="246.5" y="-5016.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="55.5,-4982 55.5,-5007 327.5,-5007 
327.5,-4982 55.5,-4982"/>
-<text text-anchor="start" x="60.5" y="-4991.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">processor_subdir</text>
-<text text-anchor="start" x="179.5" y="-4991.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<text text-anchor="start" x="60.5" y="-5016.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">processor_subdir</text>
+<text text-anchor="start" x="179.5" y="-5016.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
 </g>
 <!-- session -->
 <g id="node39" class="node">
 <title>session</title>
-<polygon fill="none" stroke="black" points="90.5,-5336 90.5,-5364 293.5,-5364 
293.5,-5336 90.5,-5336"/>
-<text text-anchor="start" x="158" y="-5347.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">session</text>
+<polygon fill="none" stroke="black" points="90.5,-5361 90.5,-5389 293.5,-5389 
293.5,-5361 90.5,-5361"/>
+<text text-anchor="start" x="158" y="-5372.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">session</text>
+<polygon fill="none" stroke="black" points="90.5,-5336 90.5,-5361 293.5,-5361 
293.5,-5336 90.5,-5336"/>
+<text text-anchor="start" x="95.5" y="-5345.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="108.5" y="-5345.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="185.5" y="-5345.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="90.5,-5311 90.5,-5336 293.5,-5336 
293.5,-5311 90.5,-5311"/>
-<text text-anchor="start" x="95.5" y="-5320.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="108.5" y="-5320.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="185.5" y="-5320.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="95.5" y="-5320.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
+<text text-anchor="start" x="126.5" y="-5320.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BLOB]</text>
 <polygon fill="none" stroke="black" points="90.5,-5286 90.5,-5311 293.5,-5311 
293.5,-5286 90.5,-5286"/>
-<text text-anchor="start" x="95.5" y="-5295.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">data</text>
-<text text-anchor="start" x="126.5" y="-5295.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BLOB]</text>
+<text text-anchor="start" x="95.5" y="-5295.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">expiry</text>
+<text text-anchor="start" x="139.5" y="-5295.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [DATETIME]</text>
 <polygon fill="none" stroke="black" points="90.5,-5261 90.5,-5286 293.5,-5286 
293.5,-5261 90.5,-5261"/>
-<text text-anchor="start" x="95.5" y="-5270.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">expiry</text>
-<text text-anchor="start" x="139.5" y="-5270.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [DATETIME]</text>
-<polygon fill="none" stroke="black" points="90.5,-5236 90.5,-5261 293.5,-5261 
293.5,-5236 90.5,-5236"/>
-<text text-anchor="start" x="95.5" y="-5245.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">session_id</text>
-<text text-anchor="start" x="167.5" y="-5245.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(255)]</text>
+<text text-anchor="start" x="95.5" y="-5270.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">session_id</text>
+<text text-anchor="start" x="167.5" y="-5270.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(255)]</text>
 </g>
 <!-- sla_miss -->
 <g id="node40" class="node">
 <title>sla_miss</title>
-<polygon fill="none" stroke="black" points="48.5,-5565 48.5,-5593 335.5,-5593 
335.5,-5565 48.5,-5565"/>
-<text text-anchor="start" x="154" y="-5576.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">sla_miss</text>
+<polygon fill="none" stroke="black" points="48.5,-5590 48.5,-5618 335.5,-5618 
335.5,-5590 48.5,-5590"/>
+<text text-anchor="start" x="154" y="-5601.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">sla_miss</text>
+<polygon fill="none" stroke="black" points="48.5,-5565 48.5,-5590 335.5,-5590 
335.5,-5565 48.5,-5565"/>
+<text text-anchor="start" x="53.5" y="-5574.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_id</text>
+<text text-anchor="start" x="99.5" y="-5574.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="220.5" y="-5574.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="48.5,-5540 48.5,-5565 335.5,-5565 
335.5,-5540 48.5,-5540"/>
-<text text-anchor="start" x="53.5" y="-5549.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">dag_id</text>
-<text text-anchor="start" x="99.5" y="-5549.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="220.5" y="-5549.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="53.5" y="-5549.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">execution_date</text>
+<text text-anchor="start" x="158.5" y="-5549.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="254.5" y="-5549.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="48.5,-5515 48.5,-5540 335.5,-5540 
335.5,-5515 48.5,-5515"/>
-<text text-anchor="start" x="53.5" y="-5524.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">execution_date</text>
-<text text-anchor="start" x="158.5" y="-5524.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<text text-anchor="start" x="254.5" y="-5524.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="53.5" y="-5524.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">task_id</text>
+<text text-anchor="start" x="102.5" y="-5524.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="223.5" y="-5524.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="48.5,-5490 48.5,-5515 335.5,-5515 
335.5,-5490 48.5,-5490"/>
-<text text-anchor="start" x="53.5" y="-5499.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">task_id</text>
-<text text-anchor="start" x="102.5" y="-5499.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="223.5" y="-5499.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="53.5" y="-5499.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
+<text text-anchor="start" x="131.5" y="-5499.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 <polygon fill="none" stroke="black" points="48.5,-5465 48.5,-5490 335.5,-5490 
335.5,-5465 48.5,-5465"/>
-<text text-anchor="start" x="53.5" y="-5474.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
-<text text-anchor="start" x="131.5" y="-5474.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="53.5" y="-5474.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">email_sent</text>
+<text text-anchor="start" x="128.5" y="-5474.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
 <polygon fill="none" stroke="black" points="48.5,-5440 48.5,-5465 335.5,-5465 
335.5,-5440 48.5,-5440"/>
-<text text-anchor="start" x="53.5" y="-5449.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">email_sent</text>
-<text text-anchor="start" x="128.5" y="-5449.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
+<text text-anchor="start" x="53.5" y="-5449.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">notification_sent</text>
+<text text-anchor="start" x="168.5" y="-5449.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
 <polygon fill="none" stroke="black" points="48.5,-5415 48.5,-5440 335.5,-5440 
335.5,-5415 48.5,-5415"/>
-<text text-anchor="start" x="53.5" y="-5424.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">notification_sent</text>
-<text text-anchor="start" x="168.5" y="-5424.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
-<polygon fill="none" stroke="black" points="48.5,-5390 48.5,-5415 335.5,-5415 
335.5,-5390 48.5,-5390"/>
-<text text-anchor="start" x="53.5" y="-5399.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">timestamp</text>
-<text text-anchor="start" x="128.5" y="-5399.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<text text-anchor="start" x="53.5" y="-5424.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">timestamp</text>
+<text text-anchor="start" x="128.5" y="-5424.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 </g>
 <!-- slot_pool -->
 <g id="node41" class="node">
 <title>slot_pool</title>
-<polygon fill="none" stroke="black" points="103.5,-5720 103.5,-5748 
279.5,-5748 279.5,-5720 103.5,-5720"/>
-<text text-anchor="start" x="150.5" y="-5731.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">slot_pool</text>
+<polygon fill="none" stroke="black" points="103.5,-5745 103.5,-5773 
279.5,-5773 279.5,-5745 103.5,-5745"/>
+<text text-anchor="start" x="150.5" y="-5756.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">slot_pool</text>
+<polygon fill="none" stroke="black" points="103.5,-5720 103.5,-5745 
279.5,-5745 279.5,-5720 103.5,-5720"/>
+<text text-anchor="start" x="108.5" y="-5729.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="121.5" y="-5729.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="198.5" y="-5729.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="103.5,-5695 103.5,-5720 
279.5,-5720 279.5,-5695 103.5,-5695"/>
-<text text-anchor="start" x="108.5" y="-5704.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="121.5" y="-5704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="198.5" y="-5704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="108.5" y="-5704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
+<text text-anchor="start" x="186.5" y="-5704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 <polygon fill="none" stroke="black" points="103.5,-5670 103.5,-5695 
279.5,-5695 279.5,-5670 103.5,-5670"/>
-<text text-anchor="start" x="108.5" y="-5679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
-<text text-anchor="start" x="186.5" y="-5679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="108.5" y="-5679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
+<text text-anchor="start" x="138.5" y="-5679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
 <polygon fill="none" stroke="black" points="103.5,-5645 103.5,-5670 
279.5,-5670 279.5,-5645 103.5,-5645"/>
-<text text-anchor="start" x="108.5" y="-5654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
-<text text-anchor="start" x="138.5" y="-5654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
-<polygon fill="none" stroke="black" points="103.5,-5620 103.5,-5645 
279.5,-5645 279.5,-5620 103.5,-5620"/>
-<text text-anchor="start" x="108.5" y="-5629.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">slots</text>
-<text text-anchor="start" x="141.5" y="-5629.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="108.5" y="-5654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">slots</text>
+<text text-anchor="start" x="141.5" y="-5654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
 </g>
 <!-- variable -->
 <g id="node42" class="node">
 <title>variable</title>
-<polygon fill="none" stroke="black" points="100.5,-5899 100.5,-5927 
283.5,-5927 283.5,-5899 100.5,-5899"/>
-<text text-anchor="start" x="155.5" y="-5910.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">variable</text>
+<polygon fill="none" stroke="black" points="100.5,-5924 100.5,-5952 
283.5,-5952 283.5,-5924 100.5,-5924"/>
+<text text-anchor="start" x="155.5" y="-5935.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">variable</text>
+<polygon fill="none" stroke="black" points="100.5,-5899 100.5,-5924 
283.5,-5924 283.5,-5899 100.5,-5899"/>
+<text text-anchor="start" x="105.5" y="-5908.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="118.5" y="-5908.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="195.5" y="-5908.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
 <polygon fill="none" stroke="black" points="100.5,-5874 100.5,-5899 
283.5,-5899 283.5,-5874 100.5,-5874"/>
-<text text-anchor="start" x="105.5" y="-5883.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="118.5" y="-5883.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="195.5" y="-5883.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<text text-anchor="start" x="105.5" y="-5883.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
+<text text-anchor="start" x="183.5" y="-5883.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 <polygon fill="none" stroke="black" points="100.5,-5849 100.5,-5874 
283.5,-5874 283.5,-5849 100.5,-5849"/>
-<text text-anchor="start" x="105.5" y="-5858.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">description</text>
-<text text-anchor="start" x="183.5" y="-5858.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="105.5" y="-5858.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">is_encrypted</text>
+<text text-anchor="start" x="194.5" y="-5858.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
 <polygon fill="none" stroke="black" points="100.5,-5824 100.5,-5849 
283.5,-5849 283.5,-5824 100.5,-5824"/>
-<text text-anchor="start" x="105.5" y="-5833.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">is_encrypted</text>
-<text text-anchor="start" x="194.5" y="-5833.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BOOLEAN]</text>
+<text text-anchor="start" x="105.5" y="-5833.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">key</text>
+<text text-anchor="start" x="130.5" y="-5833.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
 <polygon fill="none" stroke="black" points="100.5,-5799 100.5,-5824 
283.5,-5824 283.5,-5799 100.5,-5799"/>
-<text text-anchor="start" x="105.5" y="-5808.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">key</text>
-<text text-anchor="start" x="130.5" y="-5808.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="100.5,-5774 100.5,-5799 
283.5,-5799 283.5,-5774 100.5,-5774"/>
-<text text-anchor="start" x="105.5" y="-5783.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">val</text>
-<text text-anchor="start" x="126.5" y="-5783.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
+<text text-anchor="start" x="105.5" y="-5808.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">val</text>
+<text text-anchor="start" x="126.5" y="-5808.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TEXT]</text>
 </g>
 </g>
 </svg>
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 5d243bbf30..4328e3546a 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``1986afd32c1b`` (head)         | ``ee8d93fcc81e``  | ``2.5.0``         | 
Add DagRunNote and TaskInstanceNote                          |
+| ``290244fb8b83`` (head)         | ``1986afd32c1b``  | ``2.5.0``         | 
Add is_orphaned to DatasetModel                              |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``1986afd32c1b``                | ``ee8d93fcc81e``  | ``2.5.0``         | 
Add DagRunNote and TaskInstanceNote                          |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``ee8d93fcc81e``                | ``e07f49787c9d``  | ``2.5.0``         | 
Add updated_at column to DagRun and TaskInstance             |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
diff --git a/newsfragments/27828.significant.rst 
b/newsfragments/27828.significant.rst
new file mode 100644
index 0000000000..5f67666f7f
--- /dev/null
+++ b/newsfragments/27828.significant.rst
@@ -0,0 +1,3 @@
+The Airflow config option ``scheduler.deactivate_stale_dags_interval`` has 
been renamed to ``scheduler.parsing_cleanup_interval``.
+
+The old option will continue to work but will issue deprecation warnings, and 
will be removed entirely in Airflow 3.
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b2484d7223..a1d7fb31e0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -66,6 +66,7 @@ from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars, env_vars
 from tests.test_utils.db import (
     clear_db_dags,
+    clear_db_datasets,
     clear_db_import_errors,
     clear_db_jobs,
     clear_db_pools,
@@ -123,6 +124,7 @@ class TestSchedulerJob:
         clear_db_sla_miss()
         clear_db_import_errors()
         clear_db_jobs()
+        clear_db_datasets()
         # DO NOT try to run clear_db_serialized_dags() here - this will break 
the tests
         # The tests expect DAGs to be fully loaded here via setUpClass method 
below
 
@@ -4588,6 +4590,44 @@ class TestSchedulerJob:
         (backfill_run,) = DagRun.find(dag_id=dag.dag_id, 
run_type=DagRunType.BACKFILL_JOB, session=session)
         assert backfill_run.state == State.RUNNING
 
+    def test_dataset_orphaning(self, dag_maker, session):
+        dataset1 = Dataset(uri="ds1")
+        dataset2 = Dataset(uri="ds2")
+        dataset3 = Dataset(uri="ds3")
+        dataset4 = Dataset(uri="ds4")
+
+        with dag_maker(dag_id="datasets-1", schedule=[dataset1, dataset2], 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[dataset3, dataset4])
+
+        non_orphaned_dataset_count = 
session.query(DatasetModel).filter(~DatasetModel.is_orphaned).count()
+        assert non_orphaned_dataset_count == 4
+        orphaned_dataset_count = 
session.query(DatasetModel).filter(DatasetModel.is_orphaned).count()
+        assert orphaned_dataset_count == 0
+
+        # now remove 2 dataset references
+        with dag_maker(dag_id="datasets-1", schedule=[dataset1], 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[dataset3])
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job._orphan_unreferenced_datasets(session=session)
+        session.flush()
+
+        # and find the orphans
+        non_orphaned_datasets = [
+            dataset.uri
+            for dataset in session.query(DatasetModel.uri)
+            .filter(~DatasetModel.is_orphaned)
+            .order_by(DatasetModel.uri)
+        ]
+        assert non_orphaned_datasets == ["ds1", "ds3"]
+        orphaned_datasets = [
+            dataset.uri
+            for dataset in session.query(DatasetModel.uri)
+            .filter(DatasetModel.is_orphaned)
+            .order_by(DatasetModel.uri)
+        ]
+        assert orphaned_datasets == ["ds2", "ds4"]
+
 
 @pytest.mark.need_serialized_dag
 def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9954622915..2bc8af8302 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -97,12 +97,14 @@ class TestDag:
     def setup_method(self) -> None:
         clear_db_runs()
         clear_db_dags()
+        clear_db_datasets()
         self.patcher_dag_code = 
mock.patch("airflow.models.dag.DagCode.bulk_sync_to_db")
         self.patcher_dag_code.start()
 
     def teardown_method(self) -> None:
         clear_db_runs()
         clear_db_dags()
+        clear_db_datasets()
         self.patcher_dag_code.stop()
 
     @staticmethod
@@ -939,6 +941,55 @@ class TestDag:
             .all()
         ) == {(task_id, dag_id1, d2_orm.id)}
 
+    def test_bulk_write_to_db_unorphan_datasets(self):
+        """
+        Datasets can lose their last reference and be orphaned, but then if a 
reference to them reappears, we
+        need to un-orphan those datasets
+        """
+        with create_session() as session:
+            # Create four datasets - two that have references and two that are 
unreferenced and marked as
+            # orphans
+            dataset1 = Dataset(uri="ds1")
+            dataset2 = Dataset(uri="ds2")
+            session.add(DatasetModel(uri=dataset2.uri, is_orphaned=True))
+            dataset3 = Dataset(uri="ds3")
+            dataset4 = Dataset(uri="ds4")
+            session.add(DatasetModel(uri=dataset4.uri, is_orphaned=True))
+            session.flush()
+
+            dag1 = DAG(dag_id="datasets-1", start_date=DEFAULT_DATE, 
schedule=[dataset1])
+            BashOperator(dag=dag1, task_id="task", bash_command="echo 1", 
outlets=[dataset3])
+
+            DAG.bulk_write_to_db([dag1], session=session)
+
+            # Double check
+            non_orphaned_datasets = [
+                dataset.uri
+                for dataset in session.query(DatasetModel.uri)
+                .filter(~DatasetModel.is_orphaned)
+                .order_by(DatasetModel.uri)
+            ]
+            assert non_orphaned_datasets == ["ds1", "ds3"]
+            orphaned_datasets = [
+                dataset.uri
+                for dataset in session.query(DatasetModel.uri)
+                .filter(DatasetModel.is_orphaned)
+                .order_by(DatasetModel.uri)
+            ]
+            assert orphaned_datasets == ["ds2", "ds4"]
+
+            # Now add references to the two unreferenced datasets
+            dag1 = DAG(dag_id="datasets-1", start_date=DEFAULT_DATE, 
schedule=[dataset1, dataset2])
+            BashOperator(dag=dag1, task_id="task", bash_command="echo 1", 
outlets=[dataset3, dataset4])
+
+            DAG.bulk_write_to_db([dag1], session=session)
+
+            # and count the orphans and non-orphans
+            non_orphaned_dataset_count = 
session.query(DatasetModel).filter(~DatasetModel.is_orphaned).count()
+            assert non_orphaned_dataset_count == 4
+            orphaned_dataset_count = 
session.query(DatasetModel).filter(DatasetModel.is_orphaned).count()
+            assert orphaned_dataset_count == 0
+
     def test_sync_to_db(self):
         dag = DAG(
             "dag",

Reply via email to