This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b66b648fba [python] Add drop_partitions support to FileSystem catalog 
and CLI (#7911)
b66b648fba is described below

commit b66b648fbadf39c89ca387a5db50bef0bc30ffde
Author: Junrui Lee <[email protected]>
AuthorDate: Wed May 20 17:26:41 2026 +0800

    [python] Add drop_partitions support to FileSystem catalog and CLI (#7911)
    
    Add drop_partitions support for the Python FileSystem catalog and expose
    it through the PyPaimon CLI, aligned with the Java implementation by
    using the table batch commit path and truncate_partitions semantics to
    generate an overwrite commit that removes the specified partition data.
---
 .../pypaimon/catalog/filesystem_catalog.py         | 18 ++++-
 paimon-python/pypaimon/cli/cli_table.py            | 77 +++++++++++++++++++
 paimon-python/pypaimon/tests/cli_table_test.py     | 88 ++++++++++++++++++++++
 .../pypaimon/tests/filesystem_catalog_test.py      | 61 +++++++++++++++
 4 files changed, 243 insertions(+), 1 deletion(-)

diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 6be4f67d53..86e2f775e7 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import List, Optional, Union
+from typing import Dict, List, Optional, Union
 
 from pypaimon.api.api_response import GetTagResponse, PagedList
 from pypaimon.catalog.catalog import Catalog
@@ -375,6 +375,22 @@ class FileSystemCatalog(Catalog):
 
         return PagedList(elements=result_partitions, 
next_page_token=next_page_token)
 
+    def drop_partitions(
+            self,
+            identifier: Union[str, Identifier],
+            partitions: List[Dict[str, str]],
+    ) -> None:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        if not partitions:
+            raise ValueError("Partitions list cannot be empty.")
+        table = self.get_table(identifier)
+        commit = table.new_batch_write_builder().new_commit()
+        try:
+            commit.truncate_partitions(partitions)
+        finally:
+            commit.close()
+
     # ===================== Tag CRUD =====================
     # Thin wrappers that delegate to FileStoreTable's existing tag helpers
     # (which in turn use the Python-side TagManager). The catalog layer is
diff --git a/paimon-python/pypaimon/cli/cli_table.py 
b/paimon-python/pypaimon/cli/cli_table.py
index 0ef6b661ab..e428bd2d71 100644
--- a/paimon-python/pypaimon/cli/cli_table.py
+++ b/paimon-python/pypaimon/cli/cli_table.py
@@ -724,6 +724,67 @@ def cmd_table_list_partitions(args):
         sys.exit(1)
 
 
+def cmd_table_drop_partition(args):
+    """
+    Execute the 'table drop-partition' command.
+
+    Drops one or more partitions from a Paimon table.
+
+    Args:
+        args: Parsed command line arguments.
+    """
+    from pypaimon.cli.cli import load_catalog_config, create_catalog
+
+    # Load catalog configuration
+    config_path = args.config
+    config = load_catalog_config(config_path)
+
+    # Create catalog
+    catalog = create_catalog(config)
+
+    # Parse table identifier
+    table_identifier = args.table
+    parts = table_identifier.split('.')
+    if len(parts) != 2:
+        print(f"Error: Invalid table identifier '{table_identifier}'. "
+              f"Expected format: 'database.table'", file=sys.stderr)
+        sys.exit(1)
+
+    # Parse partition specs
+    partition_strings = args.partition
+    if not partition_strings:
+        print("Error: At least one --partition must be specified.", 
file=sys.stderr)
+        sys.exit(1)
+
+    partitions = []
+    for partition_str in partition_strings:
+        partition_dict = {}
+        for kv in partition_str.split(','):
+            kv = kv.strip()
+            if '=' not in kv:
+                print(f"Error: Invalid partition spec '{kv}'. "
+                      f"Expected format: 'key=value'", file=sys.stderr)
+                sys.exit(1)
+            key, value = kv.split('=', 1)
+            partition_dict[key.strip()] = value.strip()
+        if not partition_dict:
+            print("Error: Empty partition spec.", file=sys.stderr)
+            sys.exit(1)
+        partitions.append(partition_dict)
+
+    # Drop partitions
+    try:
+        catalog.drop_partitions(table_identifier, partitions)
+        partition_desc = "; ".join(
+            ",".join(f"{k}={v}" for k, v in p.items()) for p in partitions
+        )
+        print(f"Successfully dropped partition(s) [{partition_desc}] "
+              f"from table '{table_identifier}'.")
+    except Exception as e:
+        print(f"Error: Failed to drop partition(s): {e}", file=sys.stderr)
+        sys.exit(1)
+
+
 def add_table_subcommands(table_parser):
     """
     Add table subcommands to the parser.
@@ -814,6 +875,22 @@ def add_table_subcommands(table_parser):
     )
     drop_parser.set_defaults(func=cmd_table_drop)
     
+    # table drop-partition command
+    drop_partition_parser = table_subparsers.add_parser(
+        'drop-partition', help='Drop one or more partitions from a table')
+    drop_partition_parser.add_argument(
+        'table',
+        help='Table identifier in format: database.table'
+    )
+    drop_partition_parser.add_argument(
+        '--partition', '-p',
+        action='append',
+        required=True,
+        help=('Partition spec in format: "key1=value1,key2=value2". '
+              'Can be specified multiple times to drop multiple partitions.')
+    )
+    drop_partition_parser.set_defaults(func=cmd_table_drop_partition)
+
     # table import command
     import_parser = table_subparsers.add_parser('import', help='Import data 
from CSV or JSON file')
     import_parser.add_argument(
diff --git a/paimon-python/pypaimon/tests/cli_table_test.py 
b/paimon-python/pypaimon/tests/cli_table_test.py
index 782c980310..b0e314644b 100644
--- a/paimon-python/pypaimon/tests/cli_table_test.py
+++ b/paimon-python/pypaimon/tests/cli_table_test.py
@@ -1351,6 +1351,94 @@ class CliTableTest(unittest.TestCase):
                 output = mock_stdout.getvalue()
                 self.assertIn('No partitions found', output)
 
+    def test_cli_table_drop_partition(self):
+        """Test drop-partition command drops a partition from a table."""
+        # Create a partitioned table for drop-partition test
+        pa_schema = pa.schema([
+            ('dt', pa.string()),
+            ('id', pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        self.catalog.create_table('test_db.drop_part_cli', schema, True)
+        table = self.catalog.get_table('test_db.drop_part_cli')
+
+        # Write data for two partitions
+        for dt_val in ['2024-01-01', '2024-01-02', '2024-01-03']:
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'dt': [dt_val],
+                'id': [1],
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Drop one partition via CLI
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'drop-partition', 'test_db.drop_part_cli',
+                    '--partition', 'dt=2024-01-02']):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                self.assertIn('Successfully dropped', output)
+
+        # Verify partition was dropped
+        result = self.catalog.list_partitions_paged('test_db.drop_part_cli')
+        specs = sorted(p.spec['dt'] for p in result.elements)
+        self.assertEqual(specs, ['2024-01-01', '2024-01-03'])
+
+    def test_cli_table_drop_partition_multiple(self):
+        """Test drop-partition command with multiple --partition flags."""
+        # Create a partitioned table
+        pa_schema = pa.schema([
+            ('dt', pa.string()),
+            ('id', pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        self.catalog.create_table('test_db.drop_multi_cli', schema, True)
+        table = self.catalog.get_table('test_db.drop_multi_cli')
+
+        for dt_val in ['2024-01-01', '2024-01-02', '2024-01-03']:
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'dt': [dt_val],
+                'id': [1],
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Drop multiple partitions via CLI
+        with patch('sys.argv',
+                   ['paimon', '-c', self.config_file,
+                    'table', 'drop-partition', 'test_db.drop_multi_cli',
+                    '--partition', 'dt=2024-01-01',
+                    '--partition', 'dt=2024-01-03']):
+            with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+                try:
+                    main()
+                except SystemExit:
+                    pass
+
+                output = mock_stdout.getvalue()
+                self.assertIn('Successfully dropped', output)
+
+        # Verify only the middle partition remains
+        result = self.catalog.list_partitions_paged('test_db.drop_multi_cli')
+        self.assertEqual(len(result.elements), 1)
+        self.assertEqual(result.elements[0].spec['dt'], '2024-01-02')
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 280ab9e186..5f5f216794 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -411,3 +411,64 @@ class FileSystemCatalogTest(unittest.TestCase):
             identifier, partition_name_pattern='dt=2025*'
         )
         self.assertEqual(len(result.elements), 0)
+
+    def test_drop_partitions(self):
+        """Test drop_partitions removes specified partitions from a table."""
+        catalog = CatalogFactory.create({"warehouse": self.warehouse})
+        catalog.create_database("test_db", False)
+
+        identifier = "test_db.drop_part_tbl"
+        self._create_partitioned_table_with_data(catalog, identifier, [
+            {'dt': '2024-01-01', 'rows': 2},
+            {'dt': '2024-01-02', 'rows': 3},
+            {'dt': '2024-01-03', 'rows': 1},
+        ])
+
+        # Verify all 3 partitions exist
+        result = catalog.list_partitions_paged(identifier)
+        self.assertEqual(len(result.elements), 3)
+
+        # Drop one partition
+        catalog.drop_partitions(identifier, [{'dt': '2024-01-02'}])
+
+        # Verify only 2 partitions remain
+        result = catalog.list_partitions_paged(identifier)
+        self.assertEqual(len(result.elements), 2)
+        specs = sorted(p.spec['dt'] for p in result.elements)
+        self.assertEqual(specs, ['2024-01-01', '2024-01-03'])
+
+    def test_drop_partitions_multiple(self):
+        """Test drop_partitions with multiple partitions at once."""
+        catalog = CatalogFactory.create({"warehouse": self.warehouse})
+        catalog.create_database("test_db", False)
+
+        identifier = "test_db.drop_multi_tbl"
+        self._create_partitioned_table_with_data(catalog, identifier, [
+            {'dt': '2024-01-01', 'rows': 1},
+            {'dt': '2024-01-02', 'rows': 1},
+            {'dt': '2024-01-03', 'rows': 1},
+        ])
+
+        # Drop two partitions at once
+        catalog.drop_partitions(identifier, [
+            {'dt': '2024-01-01'},
+            {'dt': '2024-01-03'},
+        ])
+
+        # Verify only 1 partition remains
+        result = catalog.list_partitions_paged(identifier)
+        self.assertEqual(len(result.elements), 1)
+        self.assertEqual(result.elements[0].spec['dt'], '2024-01-02')
+
+    def test_drop_partitions_empty_list_raises(self):
+        """Test drop_partitions raises ValueError for empty partitions list."""
+        catalog = CatalogFactory.create({"warehouse": self.warehouse})
+        catalog.create_database("test_db", False)
+
+        identifier = "test_db.drop_empty_tbl"
+        self._create_partitioned_table_with_data(catalog, identifier, [
+            {'dt': '2024-01-01', 'rows': 1},
+        ])
+
+        with self.assertRaises(ValueError):
+            catalog.drop_partitions(identifier, [])

Reply via email to