zhoulii opened a new issue, #6859:
URL: https://github.com/apache/paimon/issues/6859

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   When enable data-evolution, updating partial columns in tables that have 
been inserted multiple times, users currently need to manually handle file 
`first_row_id`. This approach is inconvenient and error-prone, especially in 
distributed scene.
   
   Here is a bad case:
   ```
   def test_basic(self):
           simple_pa_schema = pa.schema([
               ('f0', pa.int8()),
               ('f1', pa.int16()),
           ])
           schema = Schema.from_pyarrow_schema(simple_pa_schema,
                                               options={'row-tracking.enabled': 
'true', 'data-evolution.enabled': 'true'})
           self.catalog.create_table('default.test_row_tracking', schema, False)
           table = self.catalog.get_table('default.test_row_tracking')
   
           # write 1
           write_builder = table.new_batch_write_builder()
           table_write = write_builder.new_write()
           table_commit = write_builder.new_commit()
           expect_data = pa.Table.from_pydict({
               'f0': [-1, 2],
               'f1': [-1001, 1002]
           }, schema=simple_pa_schema)
           table_write.write_arrow(expect_data)
           table_commit.commit(table_write.prepare_commit())
           table_write.close()
           table_commit.close()
   
           # write 2
           table_write = write_builder.new_write()
           table_commit = write_builder.new_commit()
           expect_data = pa.Table.from_pydict({
               'f0': [3, 4],
               'f1': [1003, 1004]
           }, schema=simple_pa_schema)
           table_write.write_arrow(expect_data)
           table_commit.commit(table_write.prepare_commit())
           table_write.close()
           table_commit.close()
   
           # update partial columns
           table_write = write_builder.new_write().with_write_type(['f0'])
           table_commit = write_builder.new_commit()
           data2 = pa.Table.from_pydict({
               'f0': [5, 6, 7, 8],
           }, schema=pa.schema([
               ('f0', pa.int8()),
           ]))
           table_write.write_arrow(data2)
           cmts = table_write.prepare_commit()
           cmts[0].new_files[0].first_row_id = 0
           table_commit.commit(cmts)
           table_write.close()
           table_commit.close()
   
           read_builder = table.new_read_builder()
           table_scan = read_builder.new_scan()
           table_read = read_builder.new_read()
           actual_data = table_read.to_arrow(table_scan.plan().splits())
           expect_data = pa.Table.from_pydict({
               'f0': [5, 6, 7, 8],
               'f1': [-1001, 1002, 1003, 1004]
           }, schema=pa.schema([
               ('f0', pa.int8()),
               ('f1', pa.int16()),
           ]))
           self.assertEqual(actual_data, expect_data)
   ```
   
   <img width="1920" height="786" alt="Image" 
src="https://github.com/user-attachments/assets/d4e00659-575f-474b-b743-f15a75bcee1c";
 />
   
   ### Solution
   
   Introduces a new `update_columns` API in PyPaimon that:
   - Requires input data to include the `_ROW_ID` column
   - Automatically sorts and matches each `_ROW_ID` to its corresponding 
`_FIRST_ROW_ID`
   - Groups rows with the same `_FIRST_ROW_ID` and writes them to a separate 
file
   
   ```
   def test_basic(self):
           simple_pa_schema = pa.schema([
               ('f0', pa.int8()),
               ('f1', pa.int16()),
           ])
           schema = Schema.from_pyarrow_schema(simple_pa_schema,
                                               options={'row-tracking.enabled': 
'true', 'data-evolution.enabled': 'true'})
           self.catalog.create_table('default.test_row_tracking', schema, False)
           table = self.catalog.get_table('default.test_row_tracking')
   
           # write 1
           write_builder = table.new_batch_write_builder()
           table_write = write_builder.new_write()
           table_commit = write_builder.new_commit()
           expect_data = pa.Table.from_pydict({
               'f0': [-1, 2],
               'f1': [-1001, 1002]
           }, schema=simple_pa_schema)
           table_write.write_arrow(expect_data)
           table_commit.commit(table_write.prepare_commit())
           table_write.close()
           table_commit.close()
   
           # write 2
           table_write = write_builder.new_write()
           table_commit = write_builder.new_commit()
           expect_data = pa.Table.from_pydict({
               'f0': [3, 4],
               'f1': [1003, 1004]
           }, schema=simple_pa_schema)
           table_write.write_arrow(expect_data)
           table_commit.commit(table_write.prepare_commit())
           table_write.close()
           table_commit.close()
   
           # update partial columns
           table_write = write_builder.new_write()
           table_commit = write_builder.new_commit()
           data2 = pa.Table.from_pydict({
               '_ROW_ID': [0, 1, 2, 3],
               'f0': [5, 6, 7, 8],
           }, schema=pa.schema([
               ('_ROW_ID', pa.int64()),
               ('f0', pa.int8()),
           ]))
           table_write.update_columns(data2, ['f0'])
           cmts = table_write.prepare_commit()
           cmts[0].new_files[0].first_row_id = 0
           table_commit.commit(cmts)
           table_write.close()
           table_commit.close()
   
           read_builder = table.new_read_builder()
           table_scan = read_builder.new_scan()
           table_read = read_builder.new_read()
           actual_data = table_read.to_arrow(table_scan.plan().splits())
           expect_data = pa.Table.from_pydict({
               'f0': [5, 6, 7, 8],
               'f1': [-1001, 1002, 1003, 1004]
           }, schema=pa.schema([
               ('f0', pa.int8()),
               ('f1', pa.int16()),
           ]))
           self.assertEqual(actual_data, expect_data)
   ```
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to