This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5407b88c73 [python] Add consumer management for streaming progress 
(#7349)
5407b88c73 is described below

commit 5407b88c7383260b9f89b318a0bcdd72fe7bf313
Author: Toby Cole <[email protected]>
AuthorDate: Thu Mar 12 00:50:09 2026 +0000

    [python] Add consumer management for streaming progress (#7349)
---
 paimon-python/pypaimon/consumer/__init__.py        |  18 ++
 paimon-python/pypaimon/consumer/consumer.py        |  38 ++++
 .../pypaimon/consumer/consumer_manager.py          |  73 ++++++++
 paimon-python/pypaimon/tests/consumer_test.py      | 204 +++++++++++++++++++++
 4 files changed, 333 insertions(+)

diff --git a/paimon-python/pypaimon/consumer/__init__.py 
b/paimon-python/pypaimon/consumer/__init__.py
new file mode 100644
index 0000000000..df4788f894
--- /dev/null
+++ b/paimon-python/pypaimon/consumer/__init__.py
@@ -0,0 +1,18 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Consumer management for tracking streaming read progress."""
diff --git a/paimon-python/pypaimon/consumer/consumer.py 
b/paimon-python/pypaimon/consumer/consumer.py
new file mode 100644
index 0000000000..8cc913f0c6
--- /dev/null
+++ b/paimon-python/pypaimon/consumer/consumer.py
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Consumer dataclass for streaming read progress."""
+
+import json
+from dataclasses import dataclass
+
+
+@dataclass
+class Consumer:
+    """Consumer which contains the next snapshot to be read."""
+
+    next_snapshot: int
+
+    def to_json(self) -> str:
+        """Serialize to JSON."""
+        return json.dumps({"nextSnapshot": self.next_snapshot})
+
+    @staticmethod
+    def from_json(json_str: str) -> 'Consumer':
+        """Deserialize from JSON."""
+        data = json.loads(json_str)
+        return Consumer(next_snapshot=data["nextSnapshot"])
diff --git a/paimon-python/pypaimon/consumer/consumer_manager.py 
b/paimon-python/pypaimon/consumer/consumer_manager.py
new file mode 100644
index 0000000000..4edec70d18
--- /dev/null
+++ b/paimon-python/pypaimon/consumer/consumer_manager.py
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""ConsumerManager for persisting streaming read progress."""
+
+from typing import Optional
+
+from pypaimon.consumer.consumer import Consumer
+
+
+class ConsumerManager:
+    """Manages consumer state stored at {table_path}/consumer/consumer-{id}."""
+
+    CONSUMER_PREFIX = "consumer-"
+
+    def __init__(self, file_io, table_path: str):
+        self._file_io = file_io
+        self._table_path = table_path
+
+    @staticmethod
+    def _validate_consumer_id(consumer_id: str) -> None:
+        """Validate consumer_id to prevent path traversal."""
+        if not consumer_id:
+            raise ValueError("consumer_id cannot be empty")
+        if '/' in consumer_id or '\\' in consumer_id:
+            raise ValueError(
+                f"consumer_id cannot contain path separators: {consumer_id}"
+            )
+        if consumer_id in ('.', '..'):
+            raise ValueError(
+                f"consumer_id cannot be a relative path component: 
{consumer_id}"
+            )
+
+    def _consumer_path(self, consumer_id: str) -> str:
+        """Return the path to a consumer file."""
+        self._validate_consumer_id(consumer_id)
+        return (
+            f"{self._table_path}/consumer/"
+            f"{self.CONSUMER_PREFIX}{consumer_id}"
+        )
+
+    def consumer(self, consumer_id: str) -> Optional[Consumer]:
+        """Get consumer state, or None if not found."""
+        path = self._consumer_path(consumer_id)
+        if not self._file_io.exists(path):
+            return None
+
+        json_str = self._file_io.read_file_utf8(path)
+        return Consumer.from_json(json_str)
+
+    def reset_consumer(self, consumer_id: str, consumer: Consumer) -> None:
+        """Write or update consumer state."""
+        path = self._consumer_path(consumer_id)
+        self._file_io.overwrite_file_utf8(path, consumer.to_json())
+
+    def delete_consumer(self, consumer_id: str) -> None:
+        """Delete a consumer."""
+        path = self._consumer_path(consumer_id)
+        self._file_io.delete_quietly(path)
diff --git a/paimon-python/pypaimon/tests/consumer_test.py 
b/paimon-python/pypaimon/tests/consumer_test.py
new file mode 100644
index 0000000000..b6a77289c6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/consumer_test.py
@@ -0,0 +1,204 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Tests for Consumer and ConsumerManager."""
+
+import json
+import os
+import shutil
+import tempfile
+import unittest
+from unittest.mock import Mock
+
+from pypaimon.consumer.consumer import Consumer
+from pypaimon.consumer.consumer_manager import ConsumerManager
+
+
+class ConsumerTest(unittest.TestCase):
+    """Tests for Consumer data class."""
+
+    def test_consumer_to_json(self):
+        """Consumer should serialize to JSON with nextSnapshot field."""
+        consumer = Consumer(next_snapshot=42)
+        json_str = consumer.to_json()
+
+        # Parse and verify
+        data = json.loads(json_str)
+        self.assertEqual(data["nextSnapshot"], 42)
+
+    def test_consumer_from_json(self):
+        """Consumer should deserialize from JSON."""
+        json_str = '{"nextSnapshot": 42}'
+        consumer = Consumer.from_json(json_str)
+
+        self.assertEqual(consumer.next_snapshot, 42)
+
+    def test_consumer_from_json_ignores_unknown_fields(self):
+        """Consumer should ignore unknown fields in JSON."""
+        json_str = '{"nextSnapshot": 42, "unknownField": "value"}'
+        consumer = Consumer.from_json(json_str)
+
+        self.assertEqual(consumer.next_snapshot, 42)
+
+    def test_consumer_roundtrip(self):
+        """Consumer should survive JSON roundtrip."""
+        original = Consumer(next_snapshot=12345)
+        json_str = original.to_json()
+        restored = Consumer.from_json(json_str)
+
+        self.assertEqual(restored.next_snapshot, original.next_snapshot)
+
+
+class ConsumerManagerTest(unittest.TestCase):
+    """Tests for ConsumerManager."""
+
+    def setUp(self):
+        """Create a temporary directory for testing."""
+        self.tempdir = tempfile.mkdtemp()
+        self.table_path = os.path.join(self.tempdir, "test_table")
+        os.makedirs(self.table_path)
+
+        # Create mock file_io
+        self.file_io = Mock()
+        self._setup_file_io_mock()
+
+    def tearDown(self):
+        """Clean up temporary directory."""
+        shutil.rmtree(self.tempdir, ignore_errors=True)
+
+    def _setup_file_io_mock(self):
+        """Setup file_io mock to use real filesystem."""
+        def read_file_utf8(path):
+            with open(path, 'r') as f:
+                return f.read()
+
+        def overwrite_file_utf8(path, content):
+            os.makedirs(os.path.dirname(path), exist_ok=True)
+            with open(path, 'w') as f:
+                f.write(content)
+
+        def exists(path):
+            return os.path.exists(path)
+
+        def delete_quietly(path):
+            if os.path.exists(path):
+                os.remove(path)
+
+        self.file_io.read_file_utf8 = read_file_utf8
+        self.file_io.overwrite_file_utf8 = overwrite_file_utf8
+        self.file_io.exists = exists
+        self.file_io.delete_quietly = delete_quietly
+
+    def test_consumer_manager_reset_consumer(self):
+        """reset_consumer should write consumer state to file."""
+        manager = ConsumerManager(self.file_io, self.table_path)
+        consumer = Consumer(next_snapshot=42)
+
+        manager.reset_consumer("my-consumer", consumer)
+
+        # Verify file exists
+        consumer_file = os.path.join(self.table_path, "consumer", 
"consumer-my-consumer")
+        self.assertTrue(os.path.exists(consumer_file))
+
+        # Verify content
+        with open(consumer_file, 'r') as f:
+            content = f.read()
+        data = json.loads(content)
+        self.assertEqual(data["nextSnapshot"], 42)
+
+    def test_consumer_manager_get_consumer(self):
+        """consumer() should read consumer state from file."""
+        manager = ConsumerManager(self.file_io, self.table_path)
+
+        # Write consumer file directly
+        consumer_dir = os.path.join(self.table_path, "consumer")
+        os.makedirs(consumer_dir, exist_ok=True)
+        consumer_file = os.path.join(consumer_dir, "consumer-my-consumer")
+        with open(consumer_file, 'w') as f:
+            f.write('{"nextSnapshot": 42}')
+
+        # Read via manager
+        consumer = manager.consumer("my-consumer")
+
+        self.assertIsNotNone(consumer)
+        self.assertEqual(consumer.next_snapshot, 42)
+
+    def test_consumer_manager_get_nonexistent_consumer(self):
+        """consumer() should return None for non-existent consumer."""
+        manager = ConsumerManager(self.file_io, self.table_path)
+
+        consumer = manager.consumer("nonexistent")
+
+        self.assertIsNone(consumer)
+
+    def test_consumer_manager_delete_consumer(self):
+        """delete_consumer should remove consumer file."""
+        manager = ConsumerManager(self.file_io, self.table_path)
+
+        # Create consumer first
+        manager.reset_consumer("my-consumer", Consumer(next_snapshot=42))
+        consumer_file = os.path.join(self.table_path, "consumer", 
"consumer-my-consumer")
+        self.assertTrue(os.path.exists(consumer_file))
+
+        # Delete
+        manager.delete_consumer("my-consumer")
+
+        self.assertFalse(os.path.exists(consumer_file))
+
+    def test_consumer_manager_update_consumer(self):
+        """reset_consumer should update existing consumer."""
+        manager = ConsumerManager(self.file_io, self.table_path)
+
+        # Create initial consumer
+        manager.reset_consumer("my-consumer", Consumer(next_snapshot=42))
+
+        # Update
+        manager.reset_consumer("my-consumer", Consumer(next_snapshot=100))
+
+        # Verify updated
+        consumer = manager.consumer("my-consumer")
+        self.assertEqual(consumer.next_snapshot, 100)
+
+    def test_consumer_path(self):
+        """Consumer files should be in {table_path}/consumer/consumer-{id}."""
+        manager = ConsumerManager(self.file_io, self.table_path)
+
+        path = manager._consumer_path("test-id")
+
+        expected = f"{self.table_path}/consumer/consumer-test-id"
+        self.assertEqual(path, expected)
+
+    def test_validate_rejects_empty(self):
+        manager = ConsumerManager(self.file_io, self.table_path)
+        with self.assertRaises(ValueError):
+            manager._consumer_path("")
+
+    def test_validate_rejects_path_separators(self):
+        manager = ConsumerManager(self.file_io, self.table_path)
+        for bad_id in ("foo/bar", "foo\\bar"):
+            with self.assertRaises(ValueError, msg=bad_id):
+                manager._consumer_path(bad_id)
+
+    def test_validate_rejects_relative_components(self):
+        manager = ConsumerManager(self.file_io, self.table_path)
+        for bad_id in (".", ".."):
+            with self.assertRaises(ValueError, msg=bad_id):
+                manager._consumer_path(bad_id)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to