zhoulii opened a new issue, #6859: URL: https://github.com/apache/paimon/issues/6859
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Motivation When enable data-evolution, updating partial columns in tables that have been inserted multiple times, users currently need to manually handle file `first_row_id`. This approach is inconvenient and error-prone, especially in distributed scene. Here is a bad caseļ¼ ``` def test_basic(self): simple_pa_schema = pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), ]) schema = Schema.from_pyarrow_schema(simple_pa_schema, options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) self.catalog.create_table('default.test_row_tracking', schema, False) table = self.catalog.get_table('default.test_row_tracking') # write 1 write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() expect_data = pa.Table.from_pydict({ 'f0': [-1, 2], 'f1': [-1001, 1002] }, schema=simple_pa_schema) table_write.write_arrow(expect_data) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # write 2 table_write = write_builder.new_write() table_commit = write_builder.new_commit() expect_data = pa.Table.from_pydict({ 'f0': [3, 4], 'f1': [1003, 1004] }, schema=simple_pa_schema) table_write.write_arrow(expect_data) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # update partial columns table_write = write_builder.new_write().with_write_type(['f0']) table_commit = write_builder.new_commit() data2 = pa.Table.from_pydict({ 'f0': [5, 6, 7, 8], }, schema=pa.schema([ ('f0', pa.int8()), ])) table_write.write_arrow(data2) cmts = table_write.prepare_commit() cmts[0].new_files[0].first_row_id = 0 table_commit.commit(cmts) table_write.close() table_commit.close() read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() actual_data = table_read.to_arrow(table_scan.plan().splits()) expect_data = pa.Table.from_pydict({ 'f0': [5, 6, 7, 8], 'f1': [-1001, 1002, 1003, 1004] }, schema=pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), ])) self.assertEqual(actual_data, expect_data) ``` <img width="1920" height="786" alt="Image" src="https://github.com/user-attachments/assets/d4e00659-575f-474b-b743-f15a75bcee1c" /> ### Solution Introduces a new `update_columns` API in PyPaimon that: - Requires input data to include the `_ROW_ID` column - Automatically sorts and matches each `_ROW_ID` to its corresponding `_FIRST_ROW_ID` - Groups rows with the same `_FIRST_ROW_ID` and writes them to a separate file ``` def test_basic(self): simple_pa_schema = pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), ]) schema = Schema.from_pyarrow_schema(simple_pa_schema, options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) self.catalog.create_table('default.test_row_tracking', schema, False) table = self.catalog.get_table('default.test_row_tracking') # write 1 write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() expect_data = pa.Table.from_pydict({ 'f0': [-1, 2], 'f1': [-1001, 1002] }, schema=simple_pa_schema) table_write.write_arrow(expect_data) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # write 2 table_write = write_builder.new_write() table_commit = write_builder.new_commit() expect_data = pa.Table.from_pydict({ 'f0': [3, 4], 'f1': [1003, 1004] }, schema=simple_pa_schema) table_write.write_arrow(expect_data) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # update partial columns table_write = write_builder.new_write() table_commit = write_builder.new_commit() data2 = pa.Table.from_pydict({ '_ROW_ID': [0, 1, 2, 3], 'f0': [5, 6, 7, 8], }, schema=pa.schema([ ('_ROW_ID', pa.int64()), ('f0', pa.int8()), ])) table_write.update_columns(data2, ['f0']) cmts = table_write.prepare_commit() cmts[0].new_files[0].first_row_id = 0 table_commit.commit(cmts) table_write.close() table_commit.close() read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() actual_data = table_read.to_arrow(table_scan.plan().splits()) expect_data = pa.Table.from_pydict({ 'f0': [5, 6, 7, 8], 'f1': [-1001, 1002, 1003, 1004] }, schema=pa.schema([ ('f0', pa.int8()), ('f1', pa.int16()), ])) self.assertEqual(actual_data, expect_data) ``` ### Anything else? _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
