This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e2aec56d4e [flink] fix blob type conversion (#7405)
e2aec56d4e is described below

commit e2aec56d4e84aab8eb6f5a00921b0c1e63a1ce50
Author: Faiz <[email protected]>
AuthorDate: Thu Mar 12 20:00:13 2026 +0800

    [flink] fix blob type conversion (#7405)
    
    Currently in Flink, we need to specify blob type through config and set
    the corresponding Flink Type as Bytes i.e. VARBINARY(MAX_LENGTH).
    
    However, when converting back, the varbinary length is set to
    BlobType.DEFAULT_LENGTH, which will be inconsistent in some situations.
---
 .../apache/paimon/flink/DataTypeToLogicalType.java |  3 +-
 .../org/apache/paimon/flink/BlobTableITCase.java   | 47 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
index 16a270587d..92ae714ca5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
@@ -151,7 +151,8 @@ public class DataTypeToLogicalType implements 
DataTypeVisitor<LogicalType> {
     @Override
     public LogicalType visit(BlobType blobType) {
         // TODO introduce blob type in Flink SQL?
-        return new 
org.apache.flink.table.types.logical.VarBinaryType(BlobType.DEFAULT_SIZE);
+        return new org.apache.flink.table.types.logical.VarBinaryType(
+                org.apache.flink.table.types.logical.VarBinaryType.MAX_LENGTH);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 24b411ed7d..c698b70b77 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -24,6 +24,8 @@ import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.UriReaderFactory;
 
 import org.apache.flink.types.Row;
@@ -39,6 +41,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.stream.Stream;
 
+import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test write and read table with blob type. */
@@ -251,6 +254,50 @@ public class BlobTableITCase extends CatalogITCaseBase {
         }
     }
 
+    @Test
+    public void testBlobTypeSchemaEquals() throws Exception {
+        // Step 1: Create a Paimon table with blob field via Flink SQL
+        tEnv.executeSql(
+                "CREATE TABLE blob_schema_test ("
+                        + "id INT, "
+                        + "name STRING, "
+                        + "picture BYTES"
+                        + ") WITH ("
+                        + "'row-tracking.enabled'='true',"
+                        + "'data-evolution.enabled'='true',"
+                        + "'blob-field'='picture'"
+                        + ")");
+
+        // Step 2: Get the Paimon FileStoreTable and its RowType
+        FileStoreTable paimonTable = paimonTable("blob_schema_test");
+        RowType paimonRowType = paimonTable.rowType();
+
+        // Step 3: Create a Flink temporary table with the same schema (BYTES 
column)
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE flink_temp_table ("
+                        + "id INT, "
+                        + "name STRING, "
+                        + "picture BYTES"
+                        + ") WITH ("
+                        + "'row-tracking.enabled'='true',"
+                        + "'data-evolution.enabled'='true',"
+                        + "'connector'='blackhole'"
+                        + ")");
+        org.apache.flink.table.types.logical.RowType flinkRowType =
+                (org.apache.flink.table.types.logical.RowType)
+                        tEnv.from("flink_temp_table")
+                                .getResolvedSchema()
+                                .toPhysicalRowDataType()
+                                .getLogicalType();
+
+        // Step 4: Convert Paimon RowType to Flink RowType via 
LogicalTypeConversion
+        org.apache.flink.table.types.logical.RowType convertedRowType =
+                toLogicalType(paimonRowType);
+
+        // Step 5: Assert that schemaEquals considers them equal
+        assertThat(AbstractFlinkTableFactory.schemaEquals(convertedRowType, 
flinkRowType)).isTrue();
+    }
+
     private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
 
     public static String bytesToHex(byte[] bytes) {

Reply via email to