leaves12138 commented on code in PR #7602:
URL: https://github.com/apache/paimon/pull/7602#discussion_r3042727545
##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java:
##########
@@ -495,6 +497,11 @@ private Schema toInitialSchema(
field.dataType() instanceof
org.apache.spark.sql.types.BinaryType,
"The type of blob field must be binary");
type = new BlobType();
+ } else if (blobRefFields.contains(name)) {
Review Comment:
Same concern on the Spark side: adding the catalog/type mapping here is not
enough by itself. `SparkInternalRow.blobFields(...)` still only collects
`DataTypeRoot.BLOB`, so reads return serialized reference bytes, and
`SparkInternalRowWrapper#getBlob` still only recognizes `BlobDescriptor`, so V2
writes wrap `BLOB_REF` bytes as `BlobData` and then fail in
`BinaryWriter#serializeBlobReference`. Could we update those runtime wrappers
too?
##########
paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java:
##########
@@ -264,6 +265,18 @@ public FieldWriter visit(BlobType blobType) {
};
}
+ @Override
+ public FieldWriter visit(BlobRefType blobRefType) {
Review Comment:
This adds the ORC field writer, but `OrcTypeUtil.convertToOrcType(...)`
still only has a `case BLOB` branch. That means an ORC table with a `BLOB_REF`
column never reaches this writer because schema conversion fails first. I think
`OrcTypeUtil` needs the same `BLOB_REF -> binary` mapping.
##########
paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java:
##########
@@ -230,6 +231,11 @@ public UpdaterFactory visit(BlobType blobType) {
};
}
+ @Override
+ public UpdaterFactory visit(BlobRefType blobRefType) {
Review Comment:
Likewise for parquet, the reader updater is mirrored here, but the
write/schema side still only switches on `BLOB` (`ParquetSchemaConverter`,
`ParquetRowDataWriter`, and `ParquetReaderUtil`). With the current diff a
parquet table containing `BLOB_REF` is still unsupported. Should those code
paths be updated together?
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -1077,9 +1088,13 @@ private static org.apache.paimon.types.DataType
resolveDataType(
org.apache.flink.table.types.logical.LogicalType logicalType,
Map<String, String> options) {
List<String> blobFields = CoreOptions.blobField(options);
+ List<String> blobRefFields = CoreOptions.blobRefField(options);
if (blobFields.contains(fieldName)) {
return toBlobType(logicalType);
}
+ if (blobRefFields.contains(fieldName)) {
Review Comment:
This wires the schema option through catalog translation, but the runtime
source path still only treats `BLOB` as a special binary column.
`FileStoreSourceSplitReader.blobFieldIndex(...)` only checks
`DataTypeRoot.BLOB`, so `BLOB_REF` rows still go through plain `FlinkRowData`
and the engine sees serialized `BlobReference` bytes instead of the
dereferenced payload (and `blob-as-descriptor` will not apply either). Could we
extend that reader path as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]