This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bee7289d3f [Python] Add basic tests for schema evolution read (#6463)
bee7289d3f is described below

commit bee7289d3fc1658635e23b87302c2847b0da62df
Author: umi <[email protected]>
AuthorDate: Thu Oct 23 17:39:15 2025 +0800

    [Python] Add basic tests for schema evolution read (#6463)
---
 .../pypaimon/tests/schema_evolution_read_test.py   | 191 ++++++++++++++++-----
 1 file changed, 146 insertions(+), 45 deletions(-)

diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index 046f84487d..2ff4b09e53 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -21,6 +21,7 @@ import shutil
 import tempfile
 import unittest
 
+import pandas
 import pyarrow as pa
 
 from pypaimon import CatalogFactory, Schema
@@ -127,22 +128,23 @@ class SchemaEvolutionReadTest(unittest.TestCase):
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
 
-    def test_schema_evolution_with_read_filter(self):
+    def test_schema_evolution_type(self):
         # schema 0
         pa_schema = pa.schema([
             ('user_id', pa.int64()),
-            ('item_id', pa.int64()),
+            ('time', pa.timestamp('s')),
             ('dt', pa.string())
         ])
         schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
-        self.catalog.create_table('default.test_schema_evolution_with_filter', 
schema, False)
-        table1 = 
self.catalog.get_table('default.test_schema_evolution_with_filter')
+        self.catalog.create_table('default.schema_evolution_type', schema, 
False)
+        table1 = self.catalog.get_table('default.schema_evolution_type')
         write_builder = table1.new_batch_write_builder()
         table_write = write_builder.new_write()
         table_commit = write_builder.new_commit()
         data1 = {
             'user_id': [1, 2, 3, 4],
-            'item_id': [1001, 1002, 1003, 1004],
+            'time': [pandas.Timestamp("2025-01-01 00:00:00"), 
pandas.Timestamp("2025-01-02 00:02:00"),
+                     pandas.Timestamp("2025-01-03 00:03:00"), 
pandas.Timestamp("2025-01-04 00:04:00")],
             'dt': ['p1', 'p1', 'p2', 'p1'],
         }
         pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
@@ -153,21 +155,22 @@ class SchemaEvolutionReadTest(unittest.TestCase):
 
         # schema 1  add behavior column
         pa_schema = pa.schema([
-            ('user_id', pa.int64()),
-            ('item_id', pa.int64()),
+            ('user_id', pa.int8()),
+            ('time', pa.timestamp('ms')),
             ('dt', pa.string()),
             ('behavior', pa.string())
         ])
         schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
-        
self.catalog.create_table('default.test_schema_evolution_with_filter2', 
schema2, False)
-        table2 = 
self.catalog.get_table('default.test_schema_evolution_with_filter2')
+        self.catalog.create_table('default.schema_evolution_type2', schema2, 
False)
+        table2 = self.catalog.get_table('default.schema_evolution_type2')
         table2.table_schema.id = 1
         write_builder = table2.new_batch_write_builder()
         table_write = write_builder.new_write()
         table_commit = write_builder.new_commit()
         data2 = {
             'user_id': [5, 6, 7, 8],
-            'item_id': [1005, 1006, 1007, 1008],
+            'time': [pandas.Timestamp("2025-01-05 00:05:00"), 
pandas.Timestamp("2025-01-06 00:06:00"),
+                     pandas.Timestamp("2025-01-07 00:07:00"), 
pandas.Timestamp("2025-01-08 00:08:00")],
             'dt': ['p2', 'p1', 'p2', 'p2'],
             'behavior': ['e', 'f', 'g', 'h'],
         }
@@ -181,49 +184,22 @@ class SchemaEvolutionReadTest(unittest.TestCase):
         schema_manager = SchemaManager(table2.file_io, table2.table_path)
         schema_manager.commit(TableSchema.from_schema(schema_id=0, 
schema=schema))
         schema_manager.commit(TableSchema.from_schema(schema_id=1, 
schema=schema2))
-        # behavior filter
-        splits = self._scan_table(table1.new_read_builder())
-
-        read_builder = table2.new_read_builder()
-        predicate_builder = read_builder.new_predicate_builder()
-        predicate = predicate_builder.not_equal('behavior', "g")
-        splits2 = self._scan_table(read_builder.with_filter(predicate))
-        for split in splits2:
-            for file in split.files:
-                file.schema_id = 1
-        splits.extend(splits2)
 
-        table_read = read_builder.new_read()
-        actual = table_read.to_arrow(splits)
-        # 'behavior' is not included in the file. In order to filter more 
conservatively, we choose to discard the
-        # filtering criteria for 'behavior'
-        expected = pa.Table.from_pydict({
-            'user_id': [1, 2, 4, 3, 5, 8, 6],
-            'item_id': [1001, 1002, 1004, 1003, 1005, 1008, 1006],
-            'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p1"],
-            'behavior': [None, None, None, None, "e", "h", "f"],
-        }, schema=pa_schema)
-        self.assertEqual(expected, actual)
-        # user_id filter
         splits = self._scan_table(table1.new_read_builder())
-
         read_builder = table2.new_read_builder()
-        predicate_builder = read_builder.new_predicate_builder()
-        predicate = predicate_builder.less_than('user_id', 6)
-        splits2 = self._scan_table(read_builder.with_filter(predicate))
-        self.assertEqual(1, len(splits2))
-        for split in splits2:
-            for file in split.files:
-                file.schema_id = 1
+        splits2 = self._scan_table(read_builder)
         splits.extend(splits2)
 
         table_read = read_builder.new_read()
         actual = table_read.to_arrow(splits)
         expected = pa.Table.from_pydict({
-            'user_id': [1, 2, 4, 3, 5],
-            'item_id': [1001, 1002, 1004, 1003, 1005],
-            'dt': ["p1", "p1", "p1", "p2", "p2"],
-            'behavior': [None, None, None, None, "e"],
+            'user_id': [1, 2, 4, 3, 5, 7, 8, 6],
+            'time': [pandas.Timestamp("2025-01-01 00:00:00"), 
pandas.Timestamp("2025-01-02 00:02:00"),
+                     pandas.Timestamp("2025-01-04 00:04:00"), 
pandas.Timestamp("2025-01-03 00:03:00"),
+                     pandas.Timestamp("2025-01-05 00:05:00"), 
pandas.Timestamp("2025-01-07 00:07:00"),
+                     pandas.Timestamp("2025-01-08 00:08:00"), 
pandas.Timestamp("2025-01-06 00:06:00"), ],
+            'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p2", "p1"],
+            'behavior': [None, None, None, None, "e", "g", "h", "f"],
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
 
@@ -292,6 +268,131 @@ class SchemaEvolutionReadTest(unittest.TestCase):
         entries = 
new_scan.starting_scanner.read_manifest_entries(manifest_files)
         self.assertEqual(1, len(entries))  # verify scan filter success for 
schema evolution
 
+    def test_schema_evolution_with_read_filter(self):
+        # schema 0
+        pa_schema = pa.schema([
+            ('user_id', pa.int64()),
+            ('item_id', pa.int64()),
+            ('dt', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        self.catalog.create_table('default.test_schema_evolution_with_filter', 
schema, False)
+        table1 = 
self.catalog.get_table('default.test_schema_evolution_with_filter')
+        write_builder = table1.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # schema 1  add behavior column
+        pa_schema = pa.schema([
+            ('user_id', pa.int64()),
+            ('item_id', pa.int64()),
+            ('dt', pa.string()),
+            ('behavior', pa.string())
+        ])
+        schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        
self.catalog.create_table('default.test_schema_evolution_with_filter2', 
schema2, False)
+        table2 = 
self.catalog.get_table('default.test_schema_evolution_with_filter2')
+        table2.table_schema.id = 1
+        write_builder = table2.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+            'behavior': ['e', 'f', 'g', 'h'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # write schema-0 and schema-1 to table2
+        schema_manager = SchemaManager(table2.file_io, table2.table_path)
+        schema_manager.commit(TableSchema.from_schema(schema_id=0, 
schema=schema))
+        schema_manager.commit(TableSchema.from_schema(schema_id=1, 
schema=schema2))
+
+        # behavior or user_id filter
+        splits = self._scan_table(table1.new_read_builder())
+        read_builder = table2.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        ne_predicate = predicate_builder.equal('behavior', "g")
+        lt_predicate = predicate_builder.less_than('user_id', 6)
+        and_predicate = predicate_builder.or_predicates([ne_predicate, 
lt_predicate])
+        splits2 = self._scan_table(read_builder.with_filter(and_predicate))
+        for split in splits2:
+            for file in split.files:
+                file.schema_id = 1
+        splits.extend(splits2)
+
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(splits)
+        expected = pa.Table.from_pydict({
+            'user_id': [1, 2, 4, 3, 5, 7],
+            'item_id': [1001, 1002, 1004, 1003, 1005, 1007],
+            'dt': ["p1", "p1", "p1", "p2", "p2", "p2"],
+            'behavior': [None, None, None, None, "e", "g"],
+        }, schema=pa_schema)
+        self.assertEqual(expected, actual)
+
+        # behavior and user_id filter
+        splits = self._scan_table(table1.new_read_builder())
+
+        read_builder = table2.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        ne_predicate = predicate_builder.equal('behavior', "g")
+        lt_predicate = predicate_builder.less_than('user_id', 8)
+        and_predicate = predicate_builder.and_predicates([ne_predicate, 
lt_predicate])
+        splits2 = self._scan_table(read_builder.with_filter(and_predicate))
+        for split in splits2:
+            for file in split.files:
+                file.schema_id = 1
+        splits.extend(splits2)
+
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(splits)
+        expected = pa.Table.from_pydict({
+            'user_id': [1, 2, 4, 3, 7],
+            'item_id': [1001, 1002, 1004, 1003, 1007],
+            'dt': ["p1", "p1", "p1", "p2", "p2"],
+            'behavior': [None, None, None, None, "g"],
+        }, schema=pa_schema)
+        self.assertEqual(expected, actual)
+
+        # user_id filter
+        splits = self._scan_table(table1.new_read_builder())
+
+        read_builder = table2.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.less_than('user_id', 6)
+        splits2 = self._scan_table(read_builder.with_filter(predicate))
+        self.assertEqual(1, len(splits2))
+        for split in splits2:
+            for file in split.files:
+                file.schema_id = 1
+        splits.extend(splits2)
+
+        table_read = read_builder.new_read()
+        actual = table_read.to_arrow(splits)
+        expected = pa.Table.from_pydict({
+            'user_id': [1, 2, 4, 3, 5],
+            'item_id': [1001, 1002, 1004, 1003, 1005],
+            'dt': ["p1", "p1", "p1", "p2", "p2"],
+            'behavior': [None, None, None, None, "e"],
+        }, schema=pa_schema)
+        self.assertEqual(expected, actual)
+
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
 

Reply via email to