This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2aec56d4e [flink] fix blob type conversion (#7405)
e2aec56d4e is described below
commit e2aec56d4e84aab8eb6f5a00921b0c1e63a1ce50
Author: Faiz <[email protected]>
AuthorDate: Thu Mar 12 20:00:13 2026 +0800
[flink] fix blob type conversion (#7405)
Currently in Flink, we need to specify blob type through config and set
the corresponding Flink Type as Bytes i.e. VARBINARY(MAX_LENGTH).
However, when converting back, the varbinary length is set to
BlobType.DEFAULT_LENGTH, which will be inconsistent in some situations.
---
.../apache/paimon/flink/DataTypeToLogicalType.java | 3 +-
.../org/apache/paimon/flink/BlobTableITCase.java | 47 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
index 16a270587d..92ae714ca5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
@@ -151,7 +151,8 @@ public class DataTypeToLogicalType implements
DataTypeVisitor<LogicalType> {
@Override
public LogicalType visit(BlobType blobType) {
// TODO introduce blob type in Flink SQL?
- return new
org.apache.flink.table.types.logical.VarBinaryType(BlobType.DEFAULT_SIZE);
+ return new org.apache.flink.table.types.logical.VarBinaryType(
+ org.apache.flink.table.types.logical.VarBinaryType.MAX_LENGTH);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 24b411ed7d..c698b70b77 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -24,6 +24,8 @@ import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.UriReaderFactory;
import org.apache.flink.types.Row;
@@ -39,6 +41,7 @@ import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
+import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.assertj.core.api.Assertions.assertThat;
/** Test write and read table with blob type. */
@@ -251,6 +254,50 @@ public class BlobTableITCase extends CatalogITCaseBase {
}
}
+ @Test
+ public void testBlobTypeSchemaEquals() throws Exception {
+ // Step 1: Create a Paimon table with blob field via Flink SQL
+ tEnv.executeSql(
+ "CREATE TABLE blob_schema_test ("
+ + "id INT, "
+ + "name STRING, "
+ + "picture BYTES"
+ + ") WITH ("
+ + "'row-tracking.enabled'='true',"
+ + "'data-evolution.enabled'='true',"
+ + "'blob-field'='picture'"
+ + ")");
+
+ // Step 2: Get the Paimon FileStoreTable and its RowType
+ FileStoreTable paimonTable = paimonTable("blob_schema_test");
+ RowType paimonRowType = paimonTable.rowType();
+
+ // Step 3: Create a Flink temporary table with the same schema (BYTES
column)
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE flink_temp_table ("
+ + "id INT, "
+ + "name STRING, "
+ + "picture BYTES"
+ + ") WITH ("
+ + "'row-tracking.enabled'='true',"
+ + "'data-evolution.enabled'='true',"
+ + "'connector'='blackhole'"
+ + ")");
+ org.apache.flink.table.types.logical.RowType flinkRowType =
+ (org.apache.flink.table.types.logical.RowType)
+ tEnv.from("flink_temp_table")
+ .getResolvedSchema()
+ .toPhysicalRowDataType()
+ .getLogicalType();
+
+ // Step 4: Convert Paimon RowType to Flink RowType via
LogicalTypeConversion
+ org.apache.flink.table.types.logical.RowType convertedRowType =
+ toLogicalType(paimonRowType);
+
+ // Step 5: Assert that schemaEquals considers them equal
+ assertThat(AbstractFlinkTableFactory.schemaEquals(convertedRowType,
flinkRowType)).isTrue();
+ }
+
private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
public static String bytesToHex(byte[] bytes) {