JingsongLi commented on code in PR #8105:
URL: https://github.com/apache/paimon/pull/8105#discussion_r3347717771
##########
paimon-python/pypaimon/tests/blob_table_test.py:
##########
@@ -3570,3 +3570,239 @@ def
test_get_blob_on_non_blob_column_with_magic_bytes_raises(self):
if __name__ == '__main__':
unittest.main()
+
+
+class BlobConsumerTest(unittest.TestCase):
Review Comment:
This new test class is declared after the module-level `if __name__ ==
'__main__': unittest.main()` block. When this file is run directly,
`unittest.main()` executes before `BlobConsumerTest` is defined, so these new
tests are skipped. Could we move this class above the `unittest.main()` block
so both direct execution and discovery include it?
##########
paimon-python/pypaimon/tests/blob_table_test.py:
##########
@@ -3570,3 +3570,239 @@ def
test_get_blob_on_non_blob_column_with_magic_bytes_raises(self):
if __name__ == '__main__':
unittest.main()
+
+
+class BlobConsumerTest(unittest.TestCase):
+ """Tests for BlobConsumer callback functionality."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('test_db', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+ def test_blob_consumer_basic(self):
+ """Consumer receives one BlobDescriptor per blob written, None for
nulls."""
+ from pypaimon.table.row.blob import Blob, BlobDescriptor
+ from pypaimon.common.uri_reader import FileUriReader
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_basic', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_basic')
+
+ blob_bytes = b'hello_blob_consumer'
+ received = []
+
+ def my_consumer(field_name, descriptor):
+ received.append((field_name, descriptor))
+ return True
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.with_blob_consumer(my_consumer)
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'blob_data': [blob_bytes, blob_bytes, None],
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ self.assertEqual(len(received), 3)
+
+ for field_name, desc in received[:2]:
+ self.assertEqual(field_name, 'blob_data')
+ self.assertIsInstance(desc, BlobDescriptor)
+ uri_reader = FileUriReader(table.file_io)
+ blob = Blob.from_descriptor(uri_reader, desc)
+ self.assertEqual(blob.to_data(), blob_bytes)
+
+ self.assertEqual(received[2][0], 'blob_data')
+ self.assertIsNone(received[2][1])
+
+ def test_blob_consumer_flush_behavior(self):
+ """Consumer return value controls flush; verify flush count."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_flush', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_flush')
+
+ blob_bytes = b'flush_test_blob'
+ descriptors = []
+ flush_count = [0]
+
+ def my_consumer(field_name, descriptor):
+ descriptors.append(descriptor)
+ should_flush = len(descriptors) % 2 == 0
+ if should_flush:
+ flush_count[0] += 1
+ return should_flush
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.with_blob_consumer(my_consumer)
+
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(5)),
+ 'name': [f'row{i}' for i in range(5)],
+ 'blob_data': [blob_bytes] * 5,
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ self.assertEqual(len(descriptors), 5)
+ self.assertEqual(flush_count[0], 2)
+
+ from pypaimon.table.row.blob import Blob
+ from pypaimon.common.uri_reader import FileUriReader
+ uri_reader = FileUriReader(table.file_io)
+ for desc in descriptors:
+ self.assertIsNotNone(desc)
+ blob = Blob.from_descriptor(uri_reader, desc)
+ self.assertEqual(blob.to_data(), blob_bytes)
+
+ def test_blob_consumer_no_consumer_set(self):
+ """Without consumer, writing still works normally."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_no_consumer', schema, False)
+ table = self.catalog.get_table('test_db.blob_no_consumer')
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'blob_data': [b'data1', b'data2'],
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ result = table.new_read_builder().new_read().to_arrow(
+ table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.column('blob_data').to_pylist(), [b'data1',
b'data2'])
+
+ def test_blob_consumer_chain_call(self):
+ """with_blob_consumer returns self for chaining."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_chain', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_chain')
+
+ write_builder = table.new_batch_write_builder()
+ result = write_builder.new_write().with_blob_consumer(lambda f, d:
False)
+ self.assertIsNotNone(result)
+ result.close()
+
+ def test_blob_consumer_abort_preserves_files(self):
+ """Abort with consumer must not delete blob files that descriptors
point to."""
+ from pypaimon.table.row.blob import Blob
+ from pypaimon.common.uri_reader import FileUriReader
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '1KB',
+ })
+ self.catalog.create_table('test_db.blob_consumer_abort', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_abort')
+
+ blob_bytes = b'X' * 2048
+ received = []
+
+ def my_consumer(field_name, descriptor):
+ received.append(descriptor)
+ return False
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.with_blob_consumer(my_consumer)
+
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(5)),
+ 'blob_data': [blob_bytes] * 5,
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+
+ self.assertGreater(len(received), 0)
+
+ # Force abort — simulates a failure after some blobs were already
+ # committed (rolled) and their descriptors returned to the consumer.
+ writer.file_store_write.close()
Review Comment:
This does not currently exercise the abort behavior.
`FileStoreWrite.close()` closes the writers and then clears `data_writers`, so
the loop below is empty and no `BlobWriter.abort()` / `BlobFileWriter.abort()`
path runs. Please call abort while the data writers are still reachable, or
capture them before closing, so the test really verifies that abort preserves
consumer-visible blob files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]