ashb commented on a change in pull request #10996:
URL: https://github.com/apache/airflow/pull/10996#discussion_r495804978
##########
File path: airflow/executors/base_executor.py
##########
@@ -56,6 +56,8 @@ class BaseExecutor(LoggingMixin):
``0`` for infinity
"""
+ _job_id: Optional[str] = None
Review comment:
```suggestion
job_id: Optional[str] = None
"""job_id of the SchedulerJob this executor is running under"""
```
##########
File path: airflow/executors/base_executor.py
##########
@@ -64,6 +66,20 @@ def __init__(self, parallelism: int = PARALLELISM):
self.running: Set[TaskInstanceKey] = set()
self.event_buffer: Dict[TaskInstanceKey, EventBufferValueType] = {}
+ @property
+ def job_id(self) -> Optional[str]:
+ """
+ Get job_id for SchedulerJob
+ """
+ return self._job_id
+
+ @job_id.setter
+ def job_id(self, value: str):
+ """
+ Allows SchedulerJob to set job_id variable
+ """
+ self._job_id = value
+
Review comment:
```suggestion
```
Since we don't do anything special in the getter or setter, lets not bother
with them.
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -60,6 +60,32 @@
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
+class ResourceVersion:
+ """Singleton for tracking resourceVersion from Kubernetes"""
+
+ _instance = None
+ _resource_version = "0"
+
+ def __new__(cls):
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ return cls._instance
+
+ @property
+ def resource_version(self) -> str:
+ """
+ Get resourceVersion for Kubernetes object tracking
+ """
+ return self._resource_version
+
+ @resource_version.setter
+ def resource_version(self, value: str) -> None:
+ """
+ Set resourceVersion for Kubernetes object tracking
+ """
+ self._resource_version = value # pylint: disable=protected-access
Review comment:
```suggestion
```
Same here -- no need for a getter+setter.
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -568,19 +601,18 @@ def _create_or_update_secret(secret_name, secret_path):
def start(self) -> None:
"""Starts the executor"""
self.log.info('Start Kubernetes executor')
- self.worker_uuid =
KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid()
- if not self.worker_uuid:
- raise AirflowException("Could not get worker uuid")
- self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
+ if not self.job_id:
+ raise AirflowException("Could not get scheduler_job_id")
+ self.scheduler_job_id = self.job_id
+ self.log.debug('Start with scheduler_job_id: %s',
self.scheduler_job_id)
# always need to reset resource version since we don't know
# when we last started, note for behavior below
#
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
# /CoreV1Api.md#list_namespaced_pod
Review comment:
```suggestion
```
I don't think it's needed
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -60,6 +60,32 @@
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
+class ResourceVersion:
+ """Singleton for tracking resourceVersion from Kubernetes"""
+
+ _instance = None
+ _resource_version = "0"
Review comment:
```suggestion
resource_version = "0"
```
##########
File path:
airflow/migrations/versions/bef4f3d11e8b_drop_kuberesourceversion_and_.py
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Drop KubeResourceVersion and KubeWorkerId
+
+Revision ID: bef4f3d11e8b
+Revises: e1a11ece99cc
+Create Date: 2020-09-22 18:45:28.011654
+
+"""
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'bef4f3d11e8b'
+down_revision = 'e1a11ece99cc'
+branch_labels = None
+depends_on = None
+
+
+WORKER_UUID_TABLE = "kube_worker_uuid"
+WORKER_RESOURCEVERSION_TABLE = "kube_resource_version"
+
+
+def upgrade():
+ """Apply Drop KubeResourceVersion and KubeWorkerIdentifier tables"""
+ op.drop_table(WORKER_UUID_TABLE)
+ op.drop_table(WORKER_RESOURCEVERSION_TABLE)
+
+
+def downgrade():
+ """Unapply Drop KubeResourceVersion and KubeWorkerIdentifier tables"""
+ conn = op.get_bind()
+ conn.execute(
+ """
+ CREATE TABLE kube_resource_version (
+ one_row_id BOOLEAN DEFAULT (1) NOT NULL,
+ resource_version VARCHAR(255),
+ PRIMARY KEY (one_row_id),
+ CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id),
+ CHECK (one_row_id IN (0, 1)));
+ """
+ )
+ conn.execute(
+ """
+ CREATE TABLE kube_worker_uuid (
+ one_row_id BOOLEAN DEFAULT (1) NOT NULL,
+ worker_uuid VARCHAR(255),
+ PRIMARY KEY (one_row_id),
+ CONSTRAINT kube_worker_one_row_id CHECK (one_row_id),
+ CHECK (one_row_id IN (0, 1)));
+ """
+ )
Review comment:
Please change as Kaxil suggested. The down migration doesn't need to
care about old revisions, it should just re-create it as it was before this
revision (i.e. what does it look like "currently" on main branch? That is all
the down migration needs to recreate)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]