JingsongLi commented on code in PR #7415:
URL: https://github.com/apache/paimon/pull/7415#discussion_r2924105265
##########
docs/content/pypaimon/python-api.md:
##########
@@ -591,6 +591,129 @@ to the appropriate rollback logic.
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+## Consumer Management
+
+Consumer management allows you to track consumption progress, prevent snapshot
expiration, and resume from breakpoints.
+
+### Create ConsumerManager
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+# Get table and file_io
+catalog = CatalogFactory.create({'warehouse': 'file:///path/to/warehouse'})
+table = catalog.get_table('database_name.table_name')
+file_io = table.file_io()
+
+# Create consumer manager
+manager = table.consumer_manager
+```
+
+### Get Consumer
+
+Retrieve a consumer by its ID:
+
+```python
+from pypaimon.consumer.consumer import Consumer
+
+consumer = manager.consumer('consumer_id')
+if consumer:
+ print(f"Next snapshot: {consumer.next_snapshot}")
+else:
+ print("Consumer not found")
+```
+
+### Reset Consumer
+
+Create or reset a consumer with a new snapshot ID:
+
+```python
+# Reset consumer to snapshot 10
+manager.reset_consumer('consumer_id', Consumer(next_snapshot=10))
+```
+
+### Delete Consumer
+
+Delete a consumer by its ID:
+
+```python
+manager.delete_consumer('consumer_id')
+```
+
+### List Consumers
+
+Get all consumers with their next snapshot IDs:
+
+```python
+consumers = manager.consumers()
+for consumer_id, next_snapshot in consumers.items():
+ print(f"Consumer {consumer_id}: next snapshot {next_snapshot}")
+```
+
+### List All Consumer IDs
+
+List all consumer IDs:
+
+```python
+consumer_ids = manager.list_all_ids()
+for consumer_id in consumer_ids:
+ print(consumer_id)
+```
+
+### Get Minimum Next Snapshot
+
+Get the minimum next snapshot across all consumers:
+
+```python
+min_snapshot = manager.min_next_snapshot()
+if min_snapshot:
+ print(f"Minimum next snapshot: {min_snapshot}")
+```
+
+### Expire Consumers
+
+Expire consumers modified before a given datetime:
+
+```python
+from datetime import datetime, timedelta
+
+# Expire consumers older than 1 day
+expire_time = datetime.now() - timedelta(days=1)
+manager.expire(expire_time)
+```
+
+### Clear Consumers
+
+Clear consumers matching regular expression patterns:
+
+```python
+# Clear all consumers starting with "test_"
+manager.clear_consumers('test_.*')
+
+# Clear all consumers except those starting with "prod_"
+manager.clear_consumers(
+ '.*',
+ 'prod_.*'
+)
+```
+
+### Branch Support
+
+ConsumerManager supports multiple branches:
+
+```python
+# Main branch (default)
+manager_main = ConsumerManager(file_io, table.location())
+
+# Custom branch
+manager_branch = ConsumerManager(file_io, table.location(),
branch='feature_branch')
Review Comment:
branch_manager = manager.with_branch('xxx')
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]