JingsongLi commented on code in PR #8105:
URL: https://github.com/apache/paimon/pull/8105#discussion_r3347717771


##########
paimon-python/pypaimon/tests/blob_table_test.py:
##########
@@ -3570,3 +3570,239 @@ def 
test_get_blob_on_non_blob_column_with_magic_bytes_raises(self):
 
 if __name__ == '__main__':
     unittest.main()
+
+
+class BlobConsumerTest(unittest.TestCase):

Review Comment:
   This new test class is declared after the module-level `if __name__ == 
'__main__': unittest.main()` block. When this file is run directly, 
`unittest.main()` executes before `BlobConsumerTest` is defined, so these new 
tests are skipped. Could we move this class above the `unittest.main()` block 
so both direct execution and discovery include it?



##########
paimon-python/pypaimon/tests/blob_table_test.py:
##########
@@ -3570,3 +3570,239 @@ def 
test_get_blob_on_non_blob_column_with_magic_bytes_raises(self):
 
 if __name__ == '__main__':
     unittest.main()
+
+
+class BlobConsumerTest(unittest.TestCase):
+    """Tests for BlobConsumer callback functionality."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('test_db', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_blob_consumer_basic(self):
+        """Consumer receives one BlobDescriptor per blob written, None for 
nulls."""
+        from pypaimon.table.row.blob import Blob, BlobDescriptor
+        from pypaimon.common.uri_reader import FileUriReader
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('test_db.blob_consumer_basic', schema, False)
+        table = self.catalog.get_table('test_db.blob_consumer_basic')
+
+        blob_bytes = b'hello_blob_consumer'
+        received = []
+
+        def my_consumer(field_name, descriptor):
+            received.append((field_name, descriptor))
+            return True
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.with_blob_consumer(my_consumer)
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'blob_data': [blob_bytes, blob_bytes, None],
+        }, schema=pa_schema)
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        self.assertEqual(len(received), 3)
+
+        for field_name, desc in received[:2]:
+            self.assertEqual(field_name, 'blob_data')
+            self.assertIsInstance(desc, BlobDescriptor)
+            uri_reader = FileUriReader(table.file_io)
+            blob = Blob.from_descriptor(uri_reader, desc)
+            self.assertEqual(blob.to_data(), blob_bytes)
+
+        self.assertEqual(received[2][0], 'blob_data')
+        self.assertIsNone(received[2][1])
+
+    def test_blob_consumer_flush_behavior(self):
+        """Consumer return value controls flush; verify flush count."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('test_db.blob_consumer_flush', schema, False)
+        table = self.catalog.get_table('test_db.blob_consumer_flush')
+
+        blob_bytes = b'flush_test_blob'
+        descriptors = []
+        flush_count = [0]
+
+        def my_consumer(field_name, descriptor):
+            descriptors.append(descriptor)
+            should_flush = len(descriptors) % 2 == 0
+            if should_flush:
+                flush_count[0] += 1
+            return should_flush
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.with_blob_consumer(my_consumer)
+
+        test_data = pa.Table.from_pydict({
+            'id': list(range(5)),
+            'name': [f'row{i}' for i in range(5)],
+            'blob_data': [blob_bytes] * 5,
+        }, schema=pa_schema)
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        self.assertEqual(len(descriptors), 5)
+        self.assertEqual(flush_count[0], 2)
+
+        from pypaimon.table.row.blob import Blob
+        from pypaimon.common.uri_reader import FileUriReader
+        uri_reader = FileUriReader(table.file_io)
+        for desc in descriptors:
+            self.assertIsNotNone(desc)
+            blob = Blob.from_descriptor(uri_reader, desc)
+            self.assertEqual(blob.to_data(), blob_bytes)
+
+    def test_blob_consumer_no_consumer_set(self):
+        """Without consumer, writing still works normally."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('test_db.blob_no_consumer', schema, False)
+        table = self.catalog.get_table('test_db.blob_no_consumer')
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'blob_data': [b'data1', b'data2'],
+        }, schema=pa_schema)
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        result = table.new_read_builder().new_read().to_arrow(
+            table.new_read_builder().new_scan().plan().splits())
+        self.assertEqual(result.column('blob_data').to_pylist(), [b'data1', 
b'data2'])
+
+    def test_blob_consumer_chain_call(self):
+        """with_blob_consumer returns self for chaining."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('test_db.blob_consumer_chain', schema, False)
+        table = self.catalog.get_table('test_db.blob_consumer_chain')
+
+        write_builder = table.new_batch_write_builder()
+        result = write_builder.new_write().with_blob_consumer(lambda f, d: 
False)
+        self.assertIsNotNone(result)
+        result.close()
+
+    def test_blob_consumer_abort_preserves_files(self):
+        """Abort with consumer must not delete blob files that descriptors 
point to."""
+        from pypaimon.table.row.blob import Blob
+        from pypaimon.common.uri_reader import FileUriReader
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob.target-file-size': '1KB',
+        })
+        self.catalog.create_table('test_db.blob_consumer_abort', schema, False)
+        table = self.catalog.get_table('test_db.blob_consumer_abort')
+
+        blob_bytes = b'X' * 2048
+        received = []
+
+        def my_consumer(field_name, descriptor):
+            received.append(descriptor)
+            return False
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.with_blob_consumer(my_consumer)
+
+        test_data = pa.Table.from_pydict({
+            'id': list(range(5)),
+            'blob_data': [blob_bytes] * 5,
+        }, schema=pa_schema)
+        writer.write_arrow(test_data)
+
+        self.assertGreater(len(received), 0)
+
+        # Force abort — simulates a failure after some blobs were already
+        # committed (rolled) and their descriptors returned to the consumer.
+        writer.file_store_write.close()

Review Comment:
   This does not currently exercise the abort behavior. 
`FileStoreWrite.close()` closes the writers and then clears `data_writers`, so 
the loop below is empty and no `BlobWriter.abort()` / `BlobFileWriter.abort()` 
path runs. Please call abort while the data writers are still reachable, or 
capture them before closing, so the test really verifies that abort preserves 
consumer-visible blob files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to