This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b66b648fba [python] Add drop_partitions support to FileSystem catalog
and CLI (#7911)
b66b648fba is described below
commit b66b648fbadf39c89ca387a5db50bef0bc30ffde
Author: Junrui Lee <[email protected]>
AuthorDate: Wed May 20 17:26:41 2026 +0800
[python] Add drop_partitions support to FileSystem catalog and CLI (#7911)
Add drop_partitions support for the Python FileSystem catalog and expose
it through the PyPaimon CLI, aligned with the Java implementation by
using the table batch commit path and truncate_partitions semantics to
generate an overwrite commit that removes the specified partition data.
---
.../pypaimon/catalog/filesystem_catalog.py | 18 ++++-
paimon-python/pypaimon/cli/cli_table.py | 77 +++++++++++++++++++
paimon-python/pypaimon/tests/cli_table_test.py | 88 ++++++++++++++++++++++
.../pypaimon/tests/filesystem_catalog_test.py | 61 +++++++++++++++
4 files changed, 243 insertions(+), 1 deletion(-)
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 6be4f67d53..86e2f775e7 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import List, Optional, Union
+from typing import Dict, List, Optional, Union
from pypaimon.api.api_response import GetTagResponse, PagedList
from pypaimon.catalog.catalog import Catalog
@@ -375,6 +375,22 @@ class FileSystemCatalog(Catalog):
return PagedList(elements=result_partitions,
next_page_token=next_page_token)
+ def drop_partitions(
+ self,
+ identifier: Union[str, Identifier],
+ partitions: List[Dict[str, str]],
+ ) -> None:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ if not partitions:
+ raise ValueError("Partitions list cannot be empty.")
+ table = self.get_table(identifier)
+ commit = table.new_batch_write_builder().new_commit()
+ try:
+ commit.truncate_partitions(partitions)
+ finally:
+ commit.close()
+
# ===================== Tag CRUD =====================
# Thin wrappers that delegate to FileStoreTable's existing tag helpers
# (which in turn use the Python-side TagManager). The catalog layer is
diff --git a/paimon-python/pypaimon/cli/cli_table.py
b/paimon-python/pypaimon/cli/cli_table.py
index 0ef6b661ab..e428bd2d71 100644
--- a/paimon-python/pypaimon/cli/cli_table.py
+++ b/paimon-python/pypaimon/cli/cli_table.py
@@ -724,6 +724,67 @@ def cmd_table_list_partitions(args):
sys.exit(1)
+def cmd_table_drop_partition(args):
+ """
+ Execute the 'table drop-partition' command.
+
+ Drops one or more partitions from a Paimon table.
+
+ Args:
+ args: Parsed command line arguments.
+ """
+ from pypaimon.cli.cli import load_catalog_config, create_catalog
+
+ # Load catalog configuration
+ config_path = args.config
+ config = load_catalog_config(config_path)
+
+ # Create catalog
+ catalog = create_catalog(config)
+
+ # Parse table identifier
+ table_identifier = args.table
+ parts = table_identifier.split('.')
+ if len(parts) != 2:
+ print(f"Error: Invalid table identifier '{table_identifier}'. "
+ f"Expected format: 'database.table'", file=sys.stderr)
+ sys.exit(1)
+
+ # Parse partition specs
+ partition_strings = args.partition
+ if not partition_strings:
+ print("Error: At least one --partition must be specified.",
file=sys.stderr)
+ sys.exit(1)
+
+ partitions = []
+ for partition_str in partition_strings:
+ partition_dict = {}
+ for kv in partition_str.split(','):
+ kv = kv.strip()
+ if '=' not in kv:
+ print(f"Error: Invalid partition spec '{kv}'. "
+ f"Expected format: 'key=value'", file=sys.stderr)
+ sys.exit(1)
+ key, value = kv.split('=', 1)
+ partition_dict[key.strip()] = value.strip()
+ if not partition_dict:
+ print("Error: Empty partition spec.", file=sys.stderr)
+ sys.exit(1)
+ partitions.append(partition_dict)
+
+ # Drop partitions
+ try:
+ catalog.drop_partitions(table_identifier, partitions)
+ partition_desc = "; ".join(
+ ",".join(f"{k}={v}" for k, v in p.items()) for p in partitions
+ )
+ print(f"Successfully dropped partition(s) [{partition_desc}] "
+ f"from table '{table_identifier}'.")
+ except Exception as e:
+ print(f"Error: Failed to drop partition(s): {e}", file=sys.stderr)
+ sys.exit(1)
+
+
def add_table_subcommands(table_parser):
"""
Add table subcommands to the parser.
@@ -814,6 +875,22 @@ def add_table_subcommands(table_parser):
)
drop_parser.set_defaults(func=cmd_table_drop)
+ # table drop-partition command
+ drop_partition_parser = table_subparsers.add_parser(
+ 'drop-partition', help='Drop one or more partitions from a table')
+ drop_partition_parser.add_argument(
+ 'table',
+ help='Table identifier in format: database.table'
+ )
+ drop_partition_parser.add_argument(
+ '--partition', '-p',
+ action='append',
+ required=True,
+ help=('Partition spec in format: "key1=value1,key2=value2". '
+ 'Can be specified multiple times to drop multiple partitions.')
+ )
+ drop_partition_parser.set_defaults(func=cmd_table_drop_partition)
+
# table import command
import_parser = table_subparsers.add_parser('import', help='Import data
from CSV or JSON file')
import_parser.add_argument(
diff --git a/paimon-python/pypaimon/tests/cli_table_test.py
b/paimon-python/pypaimon/tests/cli_table_test.py
index 782c980310..b0e314644b 100644
--- a/paimon-python/pypaimon/tests/cli_table_test.py
+++ b/paimon-python/pypaimon/tests/cli_table_test.py
@@ -1351,6 +1351,94 @@ class CliTableTest(unittest.TestCase):
output = mock_stdout.getvalue()
self.assertIn('No partitions found', output)
+ def test_cli_table_drop_partition(self):
+ """Test drop-partition command drops a partition from a table."""
+ # Create a partitioned table for drop-partition test
+ pa_schema = pa.schema([
+ ('dt', pa.string()),
+ ('id', pa.int32()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ self.catalog.create_table('test_db.drop_part_cli', schema, True)
+ table = self.catalog.get_table('test_db.drop_part_cli')
+
+ # Write data for two partitions
+ for dt_val in ['2024-01-01', '2024-01-02', '2024-01-03']:
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'dt': [dt_val],
+ 'id': [1],
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Drop one partition via CLI
+ with patch('sys.argv',
+ ['paimon', '-c', self.config_file,
+ 'table', 'drop-partition', 'test_db.drop_part_cli',
+ '--partition', 'dt=2024-01-02']):
+ with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+ try:
+ main()
+ except SystemExit:
+ pass
+
+ output = mock_stdout.getvalue()
+ self.assertIn('Successfully dropped', output)
+
+ # Verify partition was dropped
+ result = self.catalog.list_partitions_paged('test_db.drop_part_cli')
+ specs = sorted(p.spec['dt'] for p in result.elements)
+ self.assertEqual(specs, ['2024-01-01', '2024-01-03'])
+
+ def test_cli_table_drop_partition_multiple(self):
+ """Test drop-partition command with multiple --partition flags."""
+ # Create a partitioned table
+ pa_schema = pa.schema([
+ ('dt', pa.string()),
+ ('id', pa.int32()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ self.catalog.create_table('test_db.drop_multi_cli', schema, True)
+ table = self.catalog.get_table('test_db.drop_multi_cli')
+
+ for dt_val in ['2024-01-01', '2024-01-02', '2024-01-03']:
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'dt': [dt_val],
+ 'id': [1],
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Drop multiple partitions via CLI
+ with patch('sys.argv',
+ ['paimon', '-c', self.config_file,
+ 'table', 'drop-partition', 'test_db.drop_multi_cli',
+ '--partition', 'dt=2024-01-01',
+ '--partition', 'dt=2024-01-03']):
+ with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
+ try:
+ main()
+ except SystemExit:
+ pass
+
+ output = mock_stdout.getvalue()
+ self.assertIn('Successfully dropped', output)
+
+ # Verify only the middle partition remains
+ result = self.catalog.list_partitions_paged('test_db.drop_multi_cli')
+ self.assertEqual(len(result.elements), 1)
+ self.assertEqual(result.elements[0].spec['dt'], '2024-01-02')
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 280ab9e186..5f5f216794 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -411,3 +411,64 @@ class FileSystemCatalogTest(unittest.TestCase):
identifier, partition_name_pattern='dt=2025*'
)
self.assertEqual(len(result.elements), 0)
+
+ def test_drop_partitions(self):
+ """Test drop_partitions removes specified partitions from a table."""
+ catalog = CatalogFactory.create({"warehouse": self.warehouse})
+ catalog.create_database("test_db", False)
+
+ identifier = "test_db.drop_part_tbl"
+ self._create_partitioned_table_with_data(catalog, identifier, [
+ {'dt': '2024-01-01', 'rows': 2},
+ {'dt': '2024-01-02', 'rows': 3},
+ {'dt': '2024-01-03', 'rows': 1},
+ ])
+
+ # Verify all 3 partitions exist
+ result = catalog.list_partitions_paged(identifier)
+ self.assertEqual(len(result.elements), 3)
+
+ # Drop one partition
+ catalog.drop_partitions(identifier, [{'dt': '2024-01-02'}])
+
+ # Verify only 2 partitions remain
+ result = catalog.list_partitions_paged(identifier)
+ self.assertEqual(len(result.elements), 2)
+ specs = sorted(p.spec['dt'] for p in result.elements)
+ self.assertEqual(specs, ['2024-01-01', '2024-01-03'])
+
+ def test_drop_partitions_multiple(self):
+ """Test drop_partitions with multiple partitions at once."""
+ catalog = CatalogFactory.create({"warehouse": self.warehouse})
+ catalog.create_database("test_db", False)
+
+ identifier = "test_db.drop_multi_tbl"
+ self._create_partitioned_table_with_data(catalog, identifier, [
+ {'dt': '2024-01-01', 'rows': 1},
+ {'dt': '2024-01-02', 'rows': 1},
+ {'dt': '2024-01-03', 'rows': 1},
+ ])
+
+ # Drop two partitions at once
+ catalog.drop_partitions(identifier, [
+ {'dt': '2024-01-01'},
+ {'dt': '2024-01-03'},
+ ])
+
+ # Verify only 1 partition remains
+ result = catalog.list_partitions_paged(identifier)
+ self.assertEqual(len(result.elements), 1)
+ self.assertEqual(result.elements[0].spec['dt'], '2024-01-02')
+
+ def test_drop_partitions_empty_list_raises(self):
+ """Test drop_partitions raises ValueError for empty partitions list."""
+ catalog = CatalogFactory.create({"warehouse": self.warehouse})
+ catalog.create_database("test_db", False)
+
+ identifier = "test_db.drop_empty_tbl"
+ self._create_partitioned_table_with_data(catalog, identifier, [
+ {'dt': '2024-01-01', 'rows': 1},
+ ])
+
+ with self.assertRaises(ValueError):
+ catalog.drop_partitions(identifier, [])