leaves12138 commented on code in PR #7602:
URL: https://github.com/apache/paimon/pull/7602#discussion_r3042801644


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -1077,9 +1088,13 @@ private static org.apache.paimon.types.DataType 
resolveDataType(
             org.apache.flink.table.types.logical.LogicalType logicalType,
             Map<String, String> options) {
         List<String> blobFields = CoreOptions.blobField(options);
+        List<String> blobRefFields = CoreOptions.blobRefField(options);
         if (blobFields.contains(fieldName)) {
             return toBlobType(logicalType);
         }
+        if (blobRefFields.contains(fieldName)) {

Review Comment:
   Addressed in `915465dc44`. `FileStoreSourceSplitReader` now treats 
`BLOB_REF` the same as `BLOB` when selecting the blob-aware row wrapper, so the 
Flink source path no longer returns raw serialized `BlobReference` bytes and 
`blob-as-descriptor` applies consistently.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -1077,9 +1088,13 @@ private static org.apache.paimon.types.DataType 
resolveDataType(
             org.apache.flink.table.types.logical.LogicalType logicalType,
             Map<String, String> options) {
         List<String> blobFields = CoreOptions.blobField(options);
+        List<String> blobRefFields = CoreOptions.blobRefField(options);
         if (blobFields.contains(fieldName)) {
             return toBlobType(logicalType);
         }
+        if (blobRefFields.contains(fieldName)) {

Review Comment:
   This wires the schema option through catalog translation, but the runtime 
source path still only treats `BLOB` as a special binary column. 
`FileStoreSourceSplitReader.blobFieldIndex(...)` only checks 
`DataTypeRoot.BLOB`, so `BLOB_REF` rows still go through plain `FlinkRowData` 
and the engine sees serialized `BlobReference` bytes instead of the 
dereferenced payload (and `blob-as-descriptor` will not apply either). Could we 
extend that reader path as well?



##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java:
##########
@@ -495,6 +497,11 @@ private Schema toInitialSchema(
                         field.dataType() instanceof 
org.apache.spark.sql.types.BinaryType,
                         "The type of blob field must be binary");
                 type = new BlobType();
+            } else if (blobRefFields.contains(name)) {

Review Comment:
   Addressed in `915465dc44`. `SparkInternalRow.blobFields(...)` now includes 
`BLOB_REF`, and both `SparkInternalRowWrapper#getBlob` and `SparkRow#getBlob` 
now decode through `BlobUtils.fromBytes(...)` with the `BlobReferenceLookup` 
resolver, so the V1/V2 write paths no longer wrap `BLOB_REF` bytes as plain 
`BlobData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to