This is an automated email from the ASF dual-hosted git repository.

bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 139cd980afb Add `--action-on-existing-key` to `pools import` and 
`connections import` (#62702)
139cd980afb is described below

commit 139cd980afbe5e8853507ed30d4205d353e20d93
Author: leon.jeon <[email protected]>
AuthorDate: Mon Mar 9 21:21:10 2026 +0000

    Add `--action-on-existing-key` to `pools import` and `connections import` 
(#62702)
    
    closes: #62695
    
    Co-authored-by: claude-flow <[email protected]>
---
 airflow-ctl/src/airflowctl/ctl/cli_config.py       | 14 ++++---
 .../airflowctl/ctl/commands/connection_command.py  |  2 +-
 .../src/airflowctl/ctl/commands/pool_command.py    |  6 +--
 .../ctl/commands/test_connections_command.py       | 48 +++++++++++++++++++++-
 .../airflow_ctl/ctl/commands/test_pool_command.py  | 36 ++++++++++++++--
 5 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py 
b/airflow-ctl/src/airflowctl/ctl/cli_config.py
index 28dce22805d..5ebfb3809e0 100755
--- a/airflow-ctl/src/airflowctl/ctl/cli_config.py
+++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py
@@ -254,12 +254,11 @@ ARG_DAG_ID = Arg(
     help="The DAG ID of the DAG to pause or unpause",
 )
 
-# Variable Commands Args
-ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg(
+ARG_ACTION_ON_EXISTING_KEY = Arg(
     flags=("-a", "--action-on-existing-key"),
     type=str,
     default="overwrite",
-    help="Action to take if we encounter a variable key that already exists.",
+    help="Action to take if the entity already exists.",
     choices=("overwrite", "fail", "skip"),
 )
 
@@ -865,7 +864,10 @@ CONNECTION_COMMANDS = (
         name="import",
         help="Import connections from a file exported with local CLI.",
         
func=lazy_load_command("airflowctl.ctl.commands.connection_command.import_"),
-        args=(Arg(flags=("file",), metavar="FILEPATH", help="Connections JSON 
file"),),
+        args=(
+            Arg(flags=("file",), metavar="FILEPATH", help="Connections JSON 
file"),
+            ARG_ACTION_ON_EXISTING_KEY,
+        ),
     ),
 )
 
@@ -895,7 +897,7 @@ POOL_COMMANDS = (
         name="import",
         help="Import pools",
         func=lazy_load_command("airflowctl.ctl.commands.pool_command.import_"),
-        args=(ARG_FILE,),
+        args=(ARG_FILE, ARG_ACTION_ON_EXISTING_KEY),
     ),
     ActionCommand(
         name="export",
@@ -913,7 +915,7 @@ VARIABLE_COMMANDS = (
         name="import",
         help="Import variables from a file exported with local CLI.",
         
func=lazy_load_command("airflowctl.ctl.commands.variable_command.import_"),
-        args=(ARG_FILE, ARG_VARIABLE_ACTION_ON_EXISTING_KEY),
+        args=(ARG_FILE, ARG_ACTION_ON_EXISTING_KEY),
     ),
 )
 
diff --git a/airflow-ctl/src/airflowctl/ctl/commands/connection_command.py 
b/airflow-ctl/src/airflowctl/ctl/commands/connection_command.py
index b689083faa2..b1a8a820998 100644
--- a/airflow-ctl/src/airflowctl/ctl/commands/connection_command.py
+++ b/airflow-ctl/src/airflowctl/ctl/commands/connection_command.py
@@ -62,7 +62,7 @@ def import_(args, api_client=NEW_API_CLIENT) -> None:
         connection_create_action = BulkCreateActionConnectionBody(
             action="create",
             entities=list(connections_data.values()),
-            action_on_existence=BulkActionOnExistence("fail"),
+            
action_on_existence=BulkActionOnExistence(args.action_on_existing_key),
         )
         response = 
api_client.connections.bulk(BulkBodyConnectionBody(actions=[connection_create_action]))
         if response.create.errors:
diff --git a/airflow-ctl/src/airflowctl/ctl/commands/pool_command.py 
b/airflow-ctl/src/airflowctl/ctl/commands/pool_command.py
index 1437b4f78de..08e56eed87b 100644
--- a/airflow-ctl/src/airflowctl/ctl/commands/pool_command.py
+++ b/airflow-ctl/src/airflowctl/ctl/commands/pool_command.py
@@ -41,7 +41,7 @@ def import_(args, api_client: Client = NEW_API_CLIENT) -> 
None:
     if not filepath.exists():
         raise SystemExit(f"Missing pools file {args.file}")
 
-    success, errors = _import_helper(api_client, filepath)
+    success, errors = _import_helper(api_client, filepath, 
BulkActionOnExistence(args.action_on_existing_key))
     if errors:
         raise SystemExit(f"Failed to update pool(s): {errors}")
     rich.print(success)
@@ -83,7 +83,7 @@ def export(args, api_client: Client = NEW_API_CLIENT) -> None:
         raise SystemExit(f"Failed to export pools: {e}")
 
 
-def _import_helper(api_client: Client, filepath: Path):
+def _import_helper(api_client: Client, filepath: Path, action_on_existence: 
BulkActionOnExistence):
     """Help import pools from the json file."""
     try:
         with open(filepath) as f:
@@ -113,7 +113,7 @@ def _import_helper(api_client: Client, filepath: Path):
             BulkCreateActionPoolBody(
                 action="create",
                 entities=pools_to_update,
-                action_on_existence=BulkActionOnExistence.FAIL,
+                action_on_existence=action_on_existence,
             )
         ]
     )
diff --git 
a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_connections_command.py 
b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_connections_command.py
index 02b56eda99b..f944e66ab1f 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_connections_command.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_connections_command.py
@@ -17,12 +17,14 @@
 from __future__ import annotations
 
 import json
+from unittest import mock
 from unittest.mock import patch
 
 import pytest
 
-from airflowctl.api.client import ClientKind
+from airflowctl.api.client import Client, ClientKind
 from airflowctl.api.datamodels.generated import (
+    BulkActionOnExistence,
     BulkActionResponse,
     BulkResponse,
     ConnectionBody,
@@ -176,3 +178,47 @@ class TestCliConnectionCommands:
             extra=None,
             description="",
         )
+
+    @pytest.mark.parametrize(
+        ("action_on_existing_key", "expected_enum"),
+        [
+            ("overwrite", BulkActionOnExistence.OVERWRITE),
+            ("skip", BulkActionOnExistence.SKIP),
+            ("fail", BulkActionOnExistence.FAIL),
+        ],
+    )
+    def test_import_action_on_existing_key(self, tmp_path, 
action_on_existing_key, expected_enum):
+        expected_json_path = tmp_path / self.export_file_name
+        connection_file = {
+            self.connection_id: {
+                "conn_type": "test_type",
+                "host": "test_host",
+                "extra": "{}",
+                "connection_id": self.connection_id,
+            }
+        }
+        expected_json_path.write_text(json.dumps(connection_file))
+
+        mock_client = mock.MagicMock(spec=Client)
+        mock_response = mock.MagicMock()
+        mock_response.create.success = [self.connection_id]
+        mock_response.create.errors = []
+        mock_client.connections.bulk.return_value = mock_response
+
+        connection_command.import_(
+            self.parser.parse_args(
+                [
+                    "connections",
+                    "import",
+                    expected_json_path.as_posix(),
+                    "--action-on-existing-key",
+                    action_on_existing_key,
+                ]
+            ),
+            api_client=mock_client,
+        )
+
+        mock_client.connections.bulk.assert_called_once()
+        bulk_body = mock_client.connections.bulk.call_args[0][0]
+        action = bulk_body.actions[0]
+        assert action.action_on_existence == expected_enum
diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py 
b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py
index 84152d59c81..0bc24389294 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_pool_command.py
@@ -48,21 +48,21 @@ class TestPoolImportCommand:
         """Test import with missing file."""
         non_existent = tmp_path / "non_existent.json"
         with pytest.raises(SystemExit, match=f"Missing pools file 
{non_existent}"):
-            pool_command.import_(mock.MagicMock(file=non_existent))
+            pool_command.import_(mock.MagicMock(file=non_existent, 
action_on_existing_key="fail"))
 
     def test_import_invalid_json(self, mock_client, tmp_path):
         """Test import with invalid JSON file."""
         invalid_json = tmp_path / "invalid.json"
         invalid_json.write_text("invalid json")
         with pytest.raises(SystemExit, match="Invalid json file"):
-            pool_command.import_(mock.MagicMock(file=invalid_json))
+            pool_command.import_(mock.MagicMock(file=invalid_json, 
action_on_existing_key="fail"))
 
     def test_import_invalid_pool_config(self, mock_client, tmp_path):
         """Test import with invalid pool configuration."""
         invalid_pool = tmp_path / "invalid_pool.json"
         invalid_pool.write_text(json.dumps([{"invalid": "config"}]))
         with pytest.raises(SystemExit, match="Invalid pool configuration: 
{'invalid': 'config'}"):
-            pool_command.import_(mock.MagicMock(file=invalid_pool))
+            pool_command.import_(mock.MagicMock(file=invalid_pool, 
action_on_existing_key="fail"))
 
     def test_import_success(self, mock_client, tmp_path, capsys):
         """Test successful pool import."""
@@ -87,7 +87,7 @@ class TestPoolImportCommand:
 
         mock_client.pools.bulk.return_value = mock_bulk_builder
 
-        pool_command.import_(mock.MagicMock(file=pools_file))
+        pool_command.import_(mock.MagicMock(file=pools_file, 
action_on_existing_key="fail"))
 
         # Verify bulk operation was called with correct parameters
         mock_client.pools.bulk.assert_called_once()
@@ -108,6 +108,34 @@ class TestPoolImportCommand:
         captured = capsys.readouterr()
         assert str(["test_pool"]) in captured.out
 
+    @pytest.mark.parametrize(
+        ("action_on_existing_key", "expected_enum"),
+        [
+            ("overwrite", BulkActionOnExistence.OVERWRITE),
+            ("skip", BulkActionOnExistence.SKIP),
+            ("fail", BulkActionOnExistence.FAIL),
+        ],
+    )
+    def test_import_action_on_existing_key(
+        self, mock_client, tmp_path, action_on_existing_key, expected_enum
+    ):
+        """Test that --action-on-existing-key is passed through to the bulk 
API."""
+        pools_file = tmp_path / "pools.json"
+        pools_file.write_text(json.dumps([{"name": "test_pool", "slots": 1}]))
+
+        mock_response = mock.MagicMock()
+        mock_response.success = ["test_pool"]
+        mock_response.errors = []
+        mock_bulk_builder = mock.MagicMock()
+        mock_bulk_builder.create = mock_response
+        mock_client.pools.bulk.return_value = mock_bulk_builder
+
+        pool_command.import_(mock.MagicMock(file=pools_file, 
action_on_existing_key=action_on_existing_key))
+
+        call_args = mock_client.pools.bulk.call_args[1]
+        action = call_args["pools"].actions[0]
+        assert action.action_on_existence == expected_enum
+
 
 class TestPoolExportCommand:
     """Test cases for pool export command."""

Reply via email to