XiaoHongbo-Hope commented on code in PR #6668:
URL: https://github.com/apache/paimon/pull/6668#discussion_r2564344903
##########
paimon-python/pypaimon/write/writer/data_writer.py:
##########
@@ -143,7 +143,7 @@ def _check_and_roll_if_needed(self):
def _write_data_to_file(self, data: pa.Table):
if data.num_rows == 0:
return
- file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+ file_name =
f"{self.options.get(CoreOptions.DATA_FILE_PREFIX)}-{uuid.uuid4()}-0.{self.file_format}"
Review Comment:
This will make this config takes effect in non postpone mode too. If
DATA_FILE_PREFIX only works in postpone mode in this PR, should double check
that. And if user does not config DATA_FILE_PREFIX, will this works well?
##########
paimon-python/pypaimon/tests/write/table_write_test.py:
##########
@@ -153,3 +153,89 @@ def test_multi_prepare_commit_pk(self):
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('user_id')
self.assertEqual(self.expected, actual)
+
+ def test_postpone_read_write(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+ options={'bucket': -2})
+ self.catalog.create_table('default.test_postpone', schema, False)
+ table = self.catalog.get_table('default.test_postpone')
+ data = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ expect = pa.Table.from_pydict(data, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ self.assertTrue(os.path.exists(self.warehouse +
"/default.db/test_postpone/snapshot/LATEST"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/default.db/test_postpone/snapshot/snapshot-1"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/default.db/test_postpone/manifest"))
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default.db/test_postpone/manifest/*")), 3)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default.db/test_postpone/user_id=2/bucket-postpone/*.avro")),
+ 1)
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits)
+ self.assertTrue(not actual)
+
+ def test_data_file_prefix_format(self):
+ """Test that generated data file names follow the expected prefix
format."""
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+ options={'bucket': -2})
+ self.catalog.create_table('default.test_file_prefix', schema, False)
+ table = self.catalog.get_table('default.test_file_prefix')
+
+ # Write some data to generate files
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = {
+ 'user_id': [1, 2],
+ 'item_id': [1001, 1002],
+ 'behavior': ['a', 'b'],
+ 'dt': ['p1', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ # Find generated data files
+ table_path = os.path.join(self.warehouse, 'default.db',
'test_file_prefix')
+ data_files = []
+ for root, dirs, files in os.walk(table_path):
+ for file in files:
+ if file.endswith('.parquet') or file.endswith('.avro'):
+ data_files.append(file)
+
+ # Verify at least one data file was created
+ self.assertGreater(len(data_files), 0, "No data files were generated")
+
+ # Verify file name format:
{table_prefix}-u-{commit_user}-s-{random_number}-w--{uuid}-0.{format}
+ # Expected pattern: data--u-{user}-s-{random}-w--{uuid}-0.{format}
+ expected_pattern = r'^data--u-.+-s-\d+-w--.+-0\.avro$'
+
+ for file_name in data_files:
+ self.assertRegex(file_name, expected_pattern,
+ f"File name '{file_name}' does not match expected
prefix format")
+
+ # Additional checks for specific components
+ parts = file_name.split('-')
+ self.assertEqual('data', parts[0], f"File prefix should start with
'data', got '{parts[0]}'")
+ self.assertEqual('u', parts[2], f"Second part should be 'u', got
'{parts[2]}'")
+ self.assertEqual('s', parts[8], f"Fourth part should be 's', got
'{parts[8]}'")
+ self.assertEqual('w', parts[10], f"Sixth part should be 'w', got
'{parts[10]}'")
Review Comment:
Suggest to add a test case to assert the file name pattern of non postpone
mode table too, including the table which does not provide data file prefix
config
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]