This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e1c106d7c Fix updating variables during variable imports (#33932)
0e1c106d7c is described below
commit 0e1c106d7cd0703125528a691088e42e17c99929
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Sep 1 15:51:20 2023 +0100
Fix updating variables during variable imports (#33932)
* Fix updating variables during variable imports
We should only create new variables during variable imports and not update
already existing variables.
* Apply suggestions from code review
Co-authored-by: Tzu-ping Chung <[email protected]>
* Use flag for variable import in cli and UI
* apply suggestions from code review
* Update airflow/cli/commands/variable_command.py
Co-authored-by: Tzu-ping Chung <[email protected]>
* fixup! Update airflow/cli/commands/variable_command.py
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/cli/cli_config.py | 8 +++-
airflow/cli/commands/variable_command.py | 20 +++++++++-
airflow/www/templates/airflow/variable_list.html | 12 ++++++
airflow/www/views.py | 26 ++++++++++++-
tests/cli/commands/test_variable_command.py | 18 +++++++++
tests/www/views/test_views_variable.py | 49 ++++++++++++++++++++++++
6 files changed, 129 insertions(+), 4 deletions(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index fadf988a4a..e8e096ee6f 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -557,6 +557,12 @@ ARG_VAR_EXPORT = Arg(
help="Export all variables to JSON file",
type=argparse.FileType("w", encoding="UTF-8"),
)
+ARG_VAR_ACTION_ON_EXISTING_KEY = Arg(
+ ("-a", "--action-on-existing-key"),
+ help="Action to take if we encounter a variable key that already exists.",
+ default="overwrite",
+ choices=("overwrite", "fail", "skip"),
+)
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
@@ -1454,7 +1460,7 @@ VARIABLES_COMMANDS = (
name="import",
help="Import variables",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_import"),
- args=(ARG_VAR_IMPORT, ARG_VERBOSE),
+ args=(ARG_VAR_IMPORT, ARG_VAR_ACTION_ON_EXISTING_KEY, ARG_VERBOSE),
),
ActionCommand(
name="export",
diff --git a/airflow/cli/commands/variable_command.py
b/airflow/cli/commands/variable_command.py
index db0e92de28..bc09d0208d 100644
--- a/airflow/cli/commands/variable_command.py
+++ b/airflow/cli/commands/variable_command.py
@@ -31,7 +31,7 @@ from airflow.models import Variable
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
@suppress_logs_and_warning
@@ -76,7 +76,8 @@ def variables_delete(args):
@cli_utils.action_cli
@providers_configuration_loaded
-def variables_import(args):
+@provide_session
+def variables_import(args, session):
"""Import variables from a given file."""
if not os.path.exists(args.file):
raise SystemExit("Missing variables file.")
@@ -86,7 +87,17 @@ def variables_import(args):
except JSONDecodeError:
raise SystemExit("Invalid variables file.")
suc_count = fail_count = 0
+ skipped = set()
+ action_on_existing = args.action_on_existing_key
+ existing_keys = set()
+ if action_on_existing != "overwrite":
+ existing_keys =
set(session.scalars(select(Variable.key).where(Variable.key.in_(var_json))))
+ if action_on_existing == "fail" and existing_keys:
+ raise SystemExit(f"Failed. These keys: {sorted(existing_keys)} already
exists.")
for k, v in var_json.items():
+ if action_on_existing == "skip" and k in existing_keys:
+ skipped.add(k)
+ continue
try:
Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e:
@@ -97,6 +108,11 @@ def variables_import(args):
print(f"{suc_count} of {len(var_json)} variables successfully updated.")
if fail_count:
print(f"{fail_count} variable(s) failed to be updated.")
+ if skipped:
+ print(
+ f"The variables with these keys: {list(sorted(skipped))} "
+ f"were skipped because they already exists"
+ )
@providers_configuration_loaded
diff --git a/airflow/www/templates/airflow/variable_list.html
b/airflow/www/templates/airflow/variable_list.html
index fe2b4182fc..bd2171bdc1 100644
--- a/airflow/www/templates/airflow/variable_list.html
+++ b/airflow/www/templates/airflow/variable_list.html
@@ -29,6 +29,18 @@
<div class="form-group">
<input class="form-control-file" type="file" name="file">
</div>
+ <div class="form-group form-check">
+ <input type="radio" class="form-check-input" name="action_if_exists"
value="overwrite" checked/>
+ <label class="form-check-label">Overwrite if exists</label>
+ </div>
+ <div class="form-group form-check">
+ <input type="radio" class="form-check-input" name="action_if_exists"
value="fail"/>
+ <label class="form-check-label">Fail if exists</label>
+ </div>
+ <div class="form-group form-check">
+ <input type="radio" class="form-check-input" name="action_if_exists"
value="skip" />
+ <label class="form-check-label">Skip if exists</label>
+ </div>
<button type="submit" class="btn">
<span class="material-icons">cloud_upload</span>
Import Variables
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d3ecae5044..6b11e99b72 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5137,17 +5137,34 @@ class VariableModelView(AirflowModelView):
@expose("/varimport", methods=["POST"])
@auth.has_access([(permissions.ACTION_CAN_CREATE,
permissions.RESOURCE_VARIABLE)])
@action_logging(event=f"{permissions.RESOURCE_VARIABLE.lower()}.varimport")
- def varimport(self):
+ @provide_session
+ def varimport(self, session):
"""Import variables."""
try:
variable_dict = json.loads(request.files["file"].read())
+ action_on_existing = request.form.get("action_if_exists",
"overwrite").lower()
except Exception:
self.update_redirect()
flash("Missing file or syntax error.", "error")
return redirect(self.get_redirect())
else:
+ existing_keys = set()
+ if action_on_existing != "overwrite":
+ existing_keys = set(
+
session.scalars(select(models.Variable.key).where(models.Variable.key.in_(variable_dict)))
+ )
+ if action_on_existing == "fail" and existing_keys:
+ failed_repr = ", ".join(repr(k) for k in sorted(existing_keys))
+ flash(f"Failed. The variables with these keys: {failed_repr}
already exists.")
+ logging.error(f"Failed. The variables with these keys:
{failed_repr} already exists.")
+ return redirect(location=request.referrer)
+ skipped = set()
suc_count = fail_count = 0
for k, v in variable_dict.items():
+ if action_on_existing == "skip" and k in existing_keys:
+ logging.warning("Variable: %s already exists, skipping.",
k)
+ skipped.add(k)
+ continue
try:
models.Variable.set(k, v, serialize_json=not isinstance(v,
str))
except Exception as exc:
@@ -5158,6 +5175,13 @@ class VariableModelView(AirflowModelView):
flash(f"{suc_count} variable(s) successfully updated.")
if fail_count:
flash(f"{fail_count} variable(s) failed to be updated.",
"error")
+ if skipped:
+ skipped_repr = ", ".join(repr(k) for k in sorted(skipped))
+ flash(
+ f"The variables with these keys: {skipped_repr} were
skipped "
+ "because they already exists",
+ "warning",
+ )
self.update_redirect()
return redirect(self.get_redirect())
diff --git a/tests/cli/commands/test_variable_command.py
b/tests/cli/commands/test_variable_command.py
index b07cfbba8c..93f0fab156 100644
--- a/tests/cli/commands/test_variable_command.py
+++ b/tests/cli/commands/test_variable_command.py
@@ -106,6 +106,24 @@ class TestCliVariables:
assert Variable.get("false", deserialize_json=True) is False
assert Variable.get("null", deserialize_json=True) is None
+ # test variable import skip existing
+ # set varliable list to ["airflow"] and have it skip during import
+ variable_command.variables_set(self.parser.parse_args(["variables",
"set", "list", '["airflow"]']))
+ variable_command.variables_import(
+ self.parser.parse_args(
+ ["variables", "import", "variables_types.json",
"--action-on-existing-key", "skip"]
+ )
+ )
+ assert ["airflow"] == Variable.get("list", deserialize_json=True) #
should not be overwritten
+
+ # test variable import fails on existing when action is set to fail
+ with pytest.raises(SystemExit):
+ variable_command.variables_import(
+ self.parser.parse_args(
+ ["variables", "import", "variables_types.json",
"--action-on-existing-key", "fail"]
+ )
+ )
+
os.remove("variables_types.json")
def test_variables_list(self):
diff --git a/tests/www/views/test_views_variable.py
b/tests/www/views/test_views_variable.py
index aca8e2aeee..22ee898c9a 100644
--- a/tests/www/views/test_views_variable.py
+++ b/tests/www/views/test_views_variable.py
@@ -127,6 +127,55 @@ def test_import_variables_success(session, admin_client):
_check_last_log(session, dag_id=None, event="variables.varimport",
execution_date=None)
+def test_import_variables_override_existing_variables_if_set(session,
admin_client, caplog):
+ assert session.query(Variable).count() == 0
+ Variable.set("str_key", "str_value")
+ content = '{"str_key": "str_value", "int_key": 60}' # str_key already
exists
+ bytes_content = io.BytesIO(bytes(content, encoding="utf-8"))
+
+ resp = admin_client.post(
+ "/variable/varimport",
+ data={"file": (bytes_content, "test.json"), "action_if_exist":
"overwrite"},
+ follow_redirects=True,
+ )
+ check_content_in_response("2 variable(s) successfully updated.", resp)
+ _check_last_log(session, dag_id=None, event="variables.varimport",
execution_date=None)
+
+
+def test_import_variables_skips_update_if_set(session, admin_client, caplog):
+ assert session.query(Variable).count() == 0
+ Variable.set("str_key", "str_value")
+ content = '{"str_key": "str_value", "int_key": 60}' # str_key already
exists
+ bytes_content = io.BytesIO(bytes(content, encoding="utf-8"))
+
+ resp = admin_client.post(
+ "/variable/varimport",
+ data={"file": (bytes_content, "test.json"), "action_if_exists":
"skip"},
+ follow_redirects=True,
+ )
+ check_content_in_response("1 variable(s) successfully updated.", resp)
+
+ check_content_in_response(
+ "The variables with these keys: 'str_key' were skipped because
they already exists", resp
+ )
+ _check_last_log(session, dag_id=None, event="variables.varimport",
execution_date=None)
+ assert "Variable: str_key already exists, skipping." in caplog.text
+
+
+def test_import_variables_fails_if_action_if_exists_is_fail(session,
admin_client, caplog):
+ assert session.query(Variable).count() == 0
+ Variable.set("str_key", "str_value")
+ content = '{"str_key": "str_value", "int_key": 60}' # str_key already
exists
+ bytes_content = io.BytesIO(bytes(content, encoding="utf-8"))
+
+ admin_client.post(
+ "/variable/varimport",
+ data={"file": (bytes_content, "test.json"), "action_if_exists":
"fail"},
+ follow_redirects=True,
+ )
+ assert "Failed. The variables with these keys: 'str_key' already exists."
in caplog.text
+
+
def test_import_variables_anon(session, app):
assert session.query(Variable).count() == 0