This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5407b88c73 [python] Add consumer management for streaming progress
(#7349)
5407b88c73 is described below
commit 5407b88c7383260b9f89b318a0bcdd72fe7bf313
Author: Toby Cole <[email protected]>
AuthorDate: Thu Mar 12 00:50:09 2026 +0000
[python] Add consumer management for streaming progress (#7349)
---
paimon-python/pypaimon/consumer/__init__.py | 18 ++
paimon-python/pypaimon/consumer/consumer.py | 38 ++++
.../pypaimon/consumer/consumer_manager.py | 73 ++++++++
paimon-python/pypaimon/tests/consumer_test.py | 204 +++++++++++++++++++++
4 files changed, 333 insertions(+)
diff --git a/paimon-python/pypaimon/consumer/__init__.py
b/paimon-python/pypaimon/consumer/__init__.py
new file mode 100644
index 0000000000..df4788f894
--- /dev/null
+++ b/paimon-python/pypaimon/consumer/__init__.py
@@ -0,0 +1,18 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Consumer management for tracking streaming read progress."""
diff --git a/paimon-python/pypaimon/consumer/consumer.py
b/paimon-python/pypaimon/consumer/consumer.py
new file mode 100644
index 0000000000..8cc913f0c6
--- /dev/null
+++ b/paimon-python/pypaimon/consumer/consumer.py
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Consumer dataclass for streaming read progress."""
+
+import json
+from dataclasses import dataclass
+
+
+@dataclass
+class Consumer:
+ """Consumer which contains the next snapshot to be read."""
+
+ next_snapshot: int
+
+ def to_json(self) -> str:
+ """Serialize to JSON."""
+ return json.dumps({"nextSnapshot": self.next_snapshot})
+
+ @staticmethod
+ def from_json(json_str: str) -> 'Consumer':
+ """Deserialize from JSON."""
+ data = json.loads(json_str)
+ return Consumer(next_snapshot=data["nextSnapshot"])
diff --git a/paimon-python/pypaimon/consumer/consumer_manager.py
b/paimon-python/pypaimon/consumer/consumer_manager.py
new file mode 100644
index 0000000000..4edec70d18
--- /dev/null
+++ b/paimon-python/pypaimon/consumer/consumer_manager.py
@@ -0,0 +1,73 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""ConsumerManager for persisting streaming read progress."""
+
+from typing import Optional
+
+from pypaimon.consumer.consumer import Consumer
+
+
+class ConsumerManager:
+ """Manages consumer state stored at {table_path}/consumer/consumer-{id}."""
+
+ CONSUMER_PREFIX = "consumer-"
+
+ def __init__(self, file_io, table_path: str):
+ self._file_io = file_io
+ self._table_path = table_path
+
+ @staticmethod
+ def _validate_consumer_id(consumer_id: str) -> None:
+ """Validate consumer_id to prevent path traversal."""
+ if not consumer_id:
+ raise ValueError("consumer_id cannot be empty")
+ if '/' in consumer_id or '\\' in consumer_id:
+ raise ValueError(
+ f"consumer_id cannot contain path separators: {consumer_id}"
+ )
+ if consumer_id in ('.', '..'):
+ raise ValueError(
+ f"consumer_id cannot be a relative path component:
{consumer_id}"
+ )
+
+ def _consumer_path(self, consumer_id: str) -> str:
+ """Return the path to a consumer file."""
+ self._validate_consumer_id(consumer_id)
+ return (
+ f"{self._table_path}/consumer/"
+ f"{self.CONSUMER_PREFIX}{consumer_id}"
+ )
+
+ def consumer(self, consumer_id: str) -> Optional[Consumer]:
+ """Get consumer state, or None if not found."""
+ path = self._consumer_path(consumer_id)
+ if not self._file_io.exists(path):
+ return None
+
+ json_str = self._file_io.read_file_utf8(path)
+ return Consumer.from_json(json_str)
+
+ def reset_consumer(self, consumer_id: str, consumer: Consumer) -> None:
+ """Write or update consumer state."""
+ path = self._consumer_path(consumer_id)
+ self._file_io.overwrite_file_utf8(path, consumer.to_json())
+
+ def delete_consumer(self, consumer_id: str) -> None:
+ """Delete a consumer."""
+ path = self._consumer_path(consumer_id)
+ self._file_io.delete_quietly(path)
diff --git a/paimon-python/pypaimon/tests/consumer_test.py
b/paimon-python/pypaimon/tests/consumer_test.py
new file mode 100644
index 0000000000..b6a77289c6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/consumer_test.py
@@ -0,0 +1,204 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for Consumer and ConsumerManager."""
+
+import json
+import os
+import shutil
+import tempfile
+import unittest
+from unittest.mock import Mock
+
+from pypaimon.consumer.consumer import Consumer
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+
+class ConsumerTest(unittest.TestCase):
+ """Tests for Consumer data class."""
+
+ def test_consumer_to_json(self):
+ """Consumer should serialize to JSON with nextSnapshot field."""
+ consumer = Consumer(next_snapshot=42)
+ json_str = consumer.to_json()
+
+ # Parse and verify
+ data = json.loads(json_str)
+ self.assertEqual(data["nextSnapshot"], 42)
+
+ def test_consumer_from_json(self):
+ """Consumer should deserialize from JSON."""
+ json_str = '{"nextSnapshot": 42}'
+ consumer = Consumer.from_json(json_str)
+
+ self.assertEqual(consumer.next_snapshot, 42)
+
+ def test_consumer_from_json_ignores_unknown_fields(self):
+ """Consumer should ignore unknown fields in JSON."""
+ json_str = '{"nextSnapshot": 42, "unknownField": "value"}'
+ consumer = Consumer.from_json(json_str)
+
+ self.assertEqual(consumer.next_snapshot, 42)
+
+ def test_consumer_roundtrip(self):
+ """Consumer should survive JSON roundtrip."""
+ original = Consumer(next_snapshot=12345)
+ json_str = original.to_json()
+ restored = Consumer.from_json(json_str)
+
+ self.assertEqual(restored.next_snapshot, original.next_snapshot)
+
+
+class ConsumerManagerTest(unittest.TestCase):
+ """Tests for ConsumerManager."""
+
+ def setUp(self):
+ """Create a temporary directory for testing."""
+ self.tempdir = tempfile.mkdtemp()
+ self.table_path = os.path.join(self.tempdir, "test_table")
+ os.makedirs(self.table_path)
+
+ # Create mock file_io
+ self.file_io = Mock()
+ self._setup_file_io_mock()
+
+ def tearDown(self):
+ """Clean up temporary directory."""
+ shutil.rmtree(self.tempdir, ignore_errors=True)
+
+ def _setup_file_io_mock(self):
+ """Setup file_io mock to use real filesystem."""
+ def read_file_utf8(path):
+ with open(path, 'r') as f:
+ return f.read()
+
+ def overwrite_file_utf8(path, content):
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with open(path, 'w') as f:
+ f.write(content)
+
+ def exists(path):
+ return os.path.exists(path)
+
+ def delete_quietly(path):
+ if os.path.exists(path):
+ os.remove(path)
+
+ self.file_io.read_file_utf8 = read_file_utf8
+ self.file_io.overwrite_file_utf8 = overwrite_file_utf8
+ self.file_io.exists = exists
+ self.file_io.delete_quietly = delete_quietly
+
+ def test_consumer_manager_reset_consumer(self):
+ """reset_consumer should write consumer state to file."""
+ manager = ConsumerManager(self.file_io, self.table_path)
+ consumer = Consumer(next_snapshot=42)
+
+ manager.reset_consumer("my-consumer", consumer)
+
+ # Verify file exists
+ consumer_file = os.path.join(self.table_path, "consumer",
"consumer-my-consumer")
+ self.assertTrue(os.path.exists(consumer_file))
+
+ # Verify content
+ with open(consumer_file, 'r') as f:
+ content = f.read()
+ data = json.loads(content)
+ self.assertEqual(data["nextSnapshot"], 42)
+
+ def test_consumer_manager_get_consumer(self):
+ """consumer() should read consumer state from file."""
+ manager = ConsumerManager(self.file_io, self.table_path)
+
+ # Write consumer file directly
+ consumer_dir = os.path.join(self.table_path, "consumer")
+ os.makedirs(consumer_dir, exist_ok=True)
+ consumer_file = os.path.join(consumer_dir, "consumer-my-consumer")
+ with open(consumer_file, 'w') as f:
+ f.write('{"nextSnapshot": 42}')
+
+ # Read via manager
+ consumer = manager.consumer("my-consumer")
+
+ self.assertIsNotNone(consumer)
+ self.assertEqual(consumer.next_snapshot, 42)
+
+ def test_consumer_manager_get_nonexistent_consumer(self):
+ """consumer() should return None for non-existent consumer."""
+ manager = ConsumerManager(self.file_io, self.table_path)
+
+ consumer = manager.consumer("nonexistent")
+
+ self.assertIsNone(consumer)
+
+ def test_consumer_manager_delete_consumer(self):
+ """delete_consumer should remove consumer file."""
+ manager = ConsumerManager(self.file_io, self.table_path)
+
+ # Create consumer first
+ manager.reset_consumer("my-consumer", Consumer(next_snapshot=42))
+ consumer_file = os.path.join(self.table_path, "consumer",
"consumer-my-consumer")
+ self.assertTrue(os.path.exists(consumer_file))
+
+ # Delete
+ manager.delete_consumer("my-consumer")
+
+ self.assertFalse(os.path.exists(consumer_file))
+
+ def test_consumer_manager_update_consumer(self):
+ """reset_consumer should update existing consumer."""
+ manager = ConsumerManager(self.file_io, self.table_path)
+
+ # Create initial consumer
+ manager.reset_consumer("my-consumer", Consumer(next_snapshot=42))
+
+ # Update
+ manager.reset_consumer("my-consumer", Consumer(next_snapshot=100))
+
+ # Verify updated
+ consumer = manager.consumer("my-consumer")
+ self.assertEqual(consumer.next_snapshot, 100)
+
+ def test_consumer_path(self):
+ """Consumer files should be in {table_path}/consumer/consumer-{id}."""
+ manager = ConsumerManager(self.file_io, self.table_path)
+
+ path = manager._consumer_path("test-id")
+
+ expected = f"{self.table_path}/consumer/consumer-test-id"
+ self.assertEqual(path, expected)
+
+ def test_validate_rejects_empty(self):
+ manager = ConsumerManager(self.file_io, self.table_path)
+ with self.assertRaises(ValueError):
+ manager._consumer_path("")
+
+ def test_validate_rejects_path_separators(self):
+ manager = ConsumerManager(self.file_io, self.table_path)
+ for bad_id in ("foo/bar", "foo\\bar"):
+ with self.assertRaises(ValueError, msg=bad_id):
+ manager._consumer_path(bad_id)
+
+ def test_validate_rejects_relative_components(self):
+ manager = ConsumerManager(self.file_io, self.table_path)
+ for bad_id in (".", ".."):
+ with self.assertRaises(ValueError, msg=bad_id):
+ manager._consumer_path(bad_id)
+
+
+if __name__ == '__main__':
+ unittest.main()