discivigour commented on code in PR #6668:
URL: https://github.com/apache/paimon/pull/6668#discussion_r2564538646


##########
paimon-python/pypaimon/write/file_store_write.py:
##########
@@ -30,23 +32,27 @@
 class FileStoreWrite:
     """Base class for file store write operations."""
 
-    def __init__(self, table):
+    def __init__(self, table, commit_user):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.data_writers: Dict[Tuple, DataWriter] = {}
         self.max_seq_numbers: dict = {}
         self.write_cols = None
         self.commit_identifier = 0
+        self.options = dict(table.options)
+        self.options[CoreOptions.DATA_FILE_PREFIX] = \
+            (f"{table.options.get(CoreOptions.DATA_FILE_PREFIX, 
'data-')}-u-{commit_user}"
+             f"-s-{random.randint(0, 2 ** 31 - 2)}-w-")
 

Review Comment:
   yes, got it.



##########
paimon-python/pypaimon/tests/write/table_write_test.py:
##########
@@ -153,3 +153,89 @@ def test_multi_prepare_commit_pk(self):
         splits = read_builder.new_scan().plan().splits()
         actual = table_read.to_arrow(splits).sort_by('user_id')
         self.assertEqual(self.expected, actual)
+
+    def test_postpone_read_write(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': -2})
+        self.catalog.create_table('default.test_postpone', schema, False)
+        table = self.catalog.get_table('default.test_postpone')
+        data = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        expect = pa.Table.from_pydict(data, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(expect)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default.db/test_postpone/snapshot/LATEST"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default.db/test_postpone/snapshot/snapshot-1"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default.db/test_postpone/manifest"))
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default.db/test_postpone/manifest/*")), 3)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default.db/test_postpone/user_id=2/bucket-postpone/*.avro")),
+                         1)
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits)
+        self.assertTrue(not actual)
+
+    def test_data_file_prefix_format(self):
+        """Test that generated data file names follow the expected prefix 
format."""
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': -2})
+        self.catalog.create_table('default.test_file_prefix', schema, False)
+        table = self.catalog.get_table('default.test_file_prefix')
+
+        # Write some data to generate files
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        data = {
+            'user_id': [1, 2],
+            'item_id': [1001, 1002],
+            'behavior': ['a', 'b'],
+            'dt': ['p1', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        # Find generated data files
+        table_path = os.path.join(self.warehouse, 'default.db', 
'test_file_prefix')
+        data_files = []
+        for root, dirs, files in os.walk(table_path):
+            for file in files:
+                if file.endswith('.parquet') or file.endswith('.avro'):
+                    data_files.append(file)
+
+        # Verify at least one data file was created
+        self.assertGreater(len(data_files), 0, "No data files were generated")
+
+        # Verify file name format: 
{table_prefix}-u-{commit_user}-s-{random_number}-w--{uuid}-0.{format}
+        # Expected pattern: data--u-{user}-s-{random}-w--{uuid}-0.{format}
+        expected_pattern = r'^data--u-.+-s-\d+-w--.+-0\.avro$'
+
+        for file_name in data_files:
+            self.assertRegex(file_name, expected_pattern,
+                             f"File name '{file_name}' does not match expected 
prefix format")
+
+            # Additional checks for specific components
+            parts = file_name.split('-')
+            self.assertEqual('data', parts[0], f"File prefix should start with 
'data', got '{parts[0]}'")
+            self.assertEqual('u', parts[2], f"Second part should be 'u', got 
'{parts[2]}'")
+            self.assertEqual('s', parts[8], f"Fourth part should be 's', got 
'{parts[8]}'")
+            self.assertEqual('w', parts[10], f"Sixth part should be 'w', got 
'{parts[10]}'")

Review Comment:
   added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to