This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ada03d744fd feat(flink): Support writing out-of-line BLOB columns 
(#18958)
3ada03d744fd is described below

commit 3ada03d744fd3f0f8d2c5b88c2589ebcdac0b3ce
Author: Krishen <[email protected]>
AuthorDate: Tue Jun 16 19:29:07 2026 -0700

    feat(flink): Support writing out-of-line BLOB columns (#18958)
    
    * feat(flink): Support writing out-of-line BLOB columns
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
    Co-authored-by: Cursor <[email protected]>
---
 .../apache/hudi/util/HoodieSchemaConverter.java    |  44 ++--
 .../apache/hudi/util/RowDataToAvroConverters.java  |   8 +
 .../hudi/util/TestHoodieSchemaConverter.java       |  17 ++
 .../org/apache/hudi/table/ITTestBlobWrite.java     | 226 +++++++++++++++++++++
 .../hudi/utils/TestRowDataToAvroConverters.java    | 120 +++++++++++
 5 files changed, 394 insertions(+), 21 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index c413101dde46..c859675488ad 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -272,6 +272,16 @@ public class HoodieSchemaConverter {
 
   /**
    * Detects if a Flink RowType represents a BLOB structure by validating it 
matches the schema defined in {@link HoodieSchema.Blob}.
+   *
+   * <p>Detection intentionally keys off the stable structural signals (field 
names and base type
+   * roots) and does <b>not</b> assert nested-field nullability. Flink SQL 
{@code CREATE TABLE} does
+   * not reliably preserve {@code NOT NULL} constraints on nested {@code ROW} 
fields, so requiring an
+   * exact nullability match would silently demote a user's BLOB column to a 
generic record when the
+   * column is declared through DDL. The canonical nullability is restored by 
{@link HoodieSchema#createBlob()}.
+   *
+   * <p>TODO: This heuristic is a workaround for the lack of a native 
Flink/Parquet BLOB logical
+   * type. See <a 
href="https://github.com/apache/hudi/issues/18711";>apache/hudi#18711</a> for
+   * the tracked work to remove this structural inference.
    */
   private static boolean isBlobStructure(RowType rowType) {
     // Validate: 3 fields with exact names
@@ -286,24 +296,20 @@ public class HoodieSchemaConverter {
       return false;
     }
 
-    // Validate 'type' field: non-null STRING
-    LogicalType typeField = rowType.getTypeAt(0);
-    if (!isFamily(typeField, LogicalTypeFamily.CHARACTER_STRING) || 
typeField.isNullable()) {
+    // Validate 'type' field: STRING
+    if (!isFamily(rowType.getTypeAt(0), LogicalTypeFamily.CHARACTER_STRING)) {
       return false;
     }
 
-    // Validate 'data' field: nullable BYTES/VARBINARY
+    // Validate 'data' field: BYTES/VARBINARY
     LogicalType dataField = rowType.getTypeAt(1);
     if (dataField.getTypeRoot() != LogicalTypeRoot.BINARY && 
dataField.getTypeRoot() != LogicalTypeRoot.VARBINARY) {
       return false;
     }
-    if (!dataField.isNullable()) {
-      return false;
-    }
 
-    // Validate 'reference' field: nullable ROW
+    // Validate 'reference' field: ROW
     LogicalType referenceField = rowType.getTypeAt(2);
-    if (!referenceField.isNullable() || referenceField.getTypeRoot() != 
LogicalTypeRoot.ROW) {
+    if (referenceField.getTypeRoot() != LogicalTypeRoot.ROW) {
       return false;
     }
 
@@ -322,24 +328,20 @@ public class HoodieSchemaConverter {
     }
 
     // Validate reference sub-field types
-    // external_path: non-null STRING
-    if (!isFamily(referenceRow.getTypeAt(0), 
LogicalTypeFamily.CHARACTER_STRING)
-        || referenceRow.getTypeAt(0).isNullable()) {
+    // external_path: STRING
+    if (!isFamily(referenceRow.getTypeAt(0), 
LogicalTypeFamily.CHARACTER_STRING)) {
       return false;
     }
-    // offset: nullable BIGINT
-    if (referenceRow.getTypeAt(1).getTypeRoot() != LogicalTypeRoot.BIGINT
-        || !referenceRow.getTypeAt(1).isNullable()) {
+    // offset: BIGINT
+    if (referenceRow.getTypeAt(1).getTypeRoot() != LogicalTypeRoot.BIGINT) {
       return false;
     }
-    // length: nullable BIGINT
-    if (referenceRow.getTypeAt(2).getTypeRoot() != LogicalTypeRoot.BIGINT
-        || !referenceRow.getTypeAt(2).isNullable()) {
+    // length: BIGINT
+    if (referenceRow.getTypeAt(2).getTypeRoot() != LogicalTypeRoot.BIGINT) {
       return false;
     }
-    // managed: non-null BOOLEAN
-    if (referenceRow.getTypeAt(3).getTypeRoot() != LogicalTypeRoot.BOOLEAN
-        || referenceRow.getTypeAt(3).isNullable()) {
+    // managed: BOOLEAN
+    if (referenceRow.getTypeAt(3).getTypeRoot() != LogicalTypeRoot.BOOLEAN) {
       return false;
     }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index 6bf94b527074..050ad483de10 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -148,6 +148,14 @@ public class RowDataToAvroConverters {
 
               @Override
               public Object convert(HoodieSchema schema, Object object) {
+                // The BLOB `type` discriminator is a STRING in Flink but an 
ENUM in Avro.
+                // Detect that at call time from the HoodieSchema so the 
converter stays
+                // reusable across any row shape — not hard-wired by Flink row 
structure alone.
+                HoodieSchema nonNullSchema = schema.getNonNullType();
+                if (nonNullSchema.getType() == HoodieSchemaType.ENUM) {
+                  return new GenericData.EnumSymbol(
+                      nonNullSchema.toAvroSchema(), object.toString());
+                }
                 return new Utf8(((BinaryStringData) object).toBytes());
               }
             };
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 76b2216b4a6a..25f05eda627c 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -771,6 +771,23 @@ public class TestHoodieSchemaConverter {
     HoodieSchema convertedSchema = 
HoodieSchemaConverter.convertToSchema(blobLikeRowType);
     assertEquals(HoodieSchemaType.BLOB, convertedSchema.getType());
 
+    // Positive case: same structure but every nested field nullable. Flink 
SQL CREATE TABLE does not
+    // preserve NOT NULL on nested ROW fields, so detection must not depend on 
nested nullability.
+    DataType allNullableBlobRow = DataTypes.ROW(
+        DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().nullable()),
+        DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD, 
DataTypes.BYTES().nullable()),
+        DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, 
DataTypes.STRING().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, 
DataTypes.BIGINT().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, 
DataTypes.BIGINT().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, 
DataTypes.BOOLEAN().nullable())
+        ).nullable())
+    ).notNull();
+
+    RowType allNullableBlobRowType = (RowType) 
allNullableBlobRow.getLogicalType();
+    HoodieSchema allNullableConverted = 
HoodieSchemaConverter.convertToSchema(allNullableBlobRowType);
+    assertEquals(HoodieSchemaType.BLOB, allNullableConverted.getType());
+
     // Negative case 1: Different field names
     DataType differentNames = DataTypes.ROW(
         DataTypes.FIELD("wrong_name", DataTypes.STRING().notNull()),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestBlobWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestBlobWrite.java
new file mode 100644
index 000000000000..a66c9f034765
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestBlobWrite.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * IT case for writing out-of-line (OOL) BLOB columns through the Flink writer 
and reading them back.
+ *
+ * <p>Verifies that the Hudi {@link HoodieSchema.Blob} structure round-trips 
through the Flink
+ * write/read pipeline (both COW and MOR), that updates land through the MOR 
Avro log path, and that
+ * the stored table schema keeps the BLOB logical type instead of degrading 
the column to a generic
+ * Flink/Avro record.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestBlobWrite {
+
+  @TempDir
+  File tempFile;
+
+  private static final String BLOB_COLUMN = "blob_col";
+
+  private static final String BLOB_COLUMN_DDL =
+      "  " + BLOB_COLUMN + " ROW<\n"
+          + "    `type` STRING NOT NULL,\n"
+          + "    `data` BYTES,\n"
+          + "    `reference` ROW<\n"
+          + "      external_path STRING NOT NULL,\n"
+          + "      `offset` BIGINT,\n"
+          + "      `length` BIGINT,\n"
+          + "      managed BOOLEAN NOT NULL\n"
+          + "    >\n"
+          + "  >,\n";
+
+  /**
+   * Regression for {@code HoodieSchemaConverter#isBlobStructure}: Flink SQL 
{@code CREATE TABLE}
+   * may declare nested {@code ROW} fields as {@code NOT NULL}, but
+   * {@link ResolvedSchema#toPhysicalRowDataType()} does not always preserve 
those constraints in
+   * the {@link RowType}. Schema inference must still recognize the BLOB shape 
(same path as
+   * {@code HoodieTableFactory#inferAvroSchema}), otherwise {@code blob_col} 
would degrade to a
+   * generic RECORD in the committed Hoodie schema.
+   */
+  @Test
+  public void testFlinkSqlDdlPhysicalRowTypeStillMapsToHoodieBlob() {
+    TableEnvironment tableEnv = batchEnv();
+    final String probeTable = "flink_blob_ddl_probe";
+    String createProbeDdl =
+        "CREATE TABLE "
+            + probeTable
+            + " (\n"
+            + "  id BIGINT,\n"
+            + "  name STRING,\n"
+            + BLOB_COLUMN_DDL
+            + "  ts BIGINT\n"
+            + ") WITH ('connector'='blackhole')";
+    tableEnv.executeSql(createProbeDdl);
+
+    ResolvedSchema resolved = tableEnv.from(probeTable).getResolvedSchema();
+    RowType physical = (RowType) 
resolved.toPhysicalRowDataType().getLogicalType();
+    // DDL uses NOT NULL on nested BLOB ROW fields where the canonical Hoodie 
BLOB shape is
+    // stricter. Flink's physical RowType from 
ResolvedSchema#toPhysicalRowDataType() may or may
+    // not preserve those flags across versions (see Flink table config / 
release notes linked in
+    // the PR). We do not assert which fields widen — only that Hudi still 
recognizes the column as
+    // a BLOB (regression for HoodieSchemaConverter#isBlobStructure).
+
+    HoodieSchema recordSchema =
+        HoodieSchemaConverter.convertToSchema(
+            physical, HoodieSchemaUtils.getRecordQualifiedName(probeTable));
+    HoodieSchemaField blobField =
+        recordSchema
+            .getField(BLOB_COLUMN)
+            .orElseThrow(() -> new AssertionError("blob_col missing from 
converted HoodieSchema"));
+    assertTrue(
+        blobField.schema().isBlobField(),
+        "Physical RowType from Flink SQL DDL must still map to Hoodie BLOB, 
got: "
+            + blobField.schema());
+
+    tableEnv.executeSql("DROP TABLE " + probeTable);
+  }
+
+  private void createTable(TableEnvironment tableEnv, String tablePath, 
HoodieTableType tableType) {
+    String createTableDdl = String.format(
+        "CREATE TABLE blob_table (\n"
+            + "  id BIGINT,\n"
+            + "  name STRING,\n"
+            + BLOB_COLUMN_DDL
+            + "  ts BIGINT,\n"
+            + "  PRIMARY KEY (id) NOT ENFORCED\n"
+            + ") WITH (\n"
+            + "  'connector' = 'hudi',\n"
+            + "  'path' = '%s',\n"
+            + "  'table.type' = '%s',\n"
+            + "  'ordering.fields' = 'ts'\n"
+            + ");",
+        tablePath, tableType.name());
+    tableEnv.executeSql(createTableDdl);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testWriteAndReadOutOfLineBlob(HoodieTableType tableType) throws 
Exception {
+    TableEnvironment tableEnv = batchEnv();
+    String tablePath = new File(tempFile, "blob_table").getAbsolutePath();
+    createTable(tableEnv, tablePath, tableType);
+
+    // First batch: insert two OOL blob references.
+    execInsert(tableEnv,
+        "INSERT INTO blob_table VALUES\n"
+            + "(1, 'doc-1', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+            + "ROW('file1.bin', CAST(0 AS BIGINT), CAST(100 AS BIGINT), 
false)), 1000),\n"
+            + "(2, 'doc-2', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+            + "ROW('file1.bin', CAST(100 AS BIGINT), CAST(200 AS BIGINT), 
false)), 2000)");
+
+    List<Row> rows = readOrdered(tableEnv);
+    assertEquals(2, rows.size());
+    assertOutOfLineRow(rows.get(0), 1L, "doc-1", "file1.bin", 0L, 100L, 1000L);
+    assertOutOfLineRow(rows.get(1), 2L, "doc-2", "file1.bin", 100L, 200L, 
2000L);
+
+    // Second batch: upsert the same keys with new references. For MOR this 
exercises the Avro
+    // log write path (RowData -> Avro), including the BLOB enum `type` field.
+    execInsert(tableEnv,
+        "INSERT INTO blob_table VALUES\n"
+            + "(1, 'doc-1', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+            + "ROW('file2.bin', CAST(500 AS BIGINT), CAST(300 AS BIGINT), 
false)), 3000),\n"
+            + "(2, 'doc-2', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+            + "ROW('file2.bin', CAST(800 AS BIGINT), CAST(400 AS BIGINT), 
false)), 4000)");
+
+    List<Row> updated = readOrdered(tableEnv);
+    assertEquals(2, updated.size());
+    assertOutOfLineRow(updated.get(0), 1L, "doc-1", "file2.bin", 500L, 300L, 
3000L);
+    assertOutOfLineRow(updated.get(1), 2L, "doc-2", "file2.bin", 800L, 400L, 
4000L);
+
+    // The stored table schema must keep the BLOB logical type, not a generic 
record.
+    assertBlobTypePreserved(tablePath);
+  }
+
+  private static List<Row> readOrdered(TableEnvironment tableEnv) {
+    return CollectionUtil.iteratorToList(
+        tableEnv.executeSql("select id, name, blob_col, ts from blob_table 
order by id").collect());
+  }
+
+  private static void assertOutOfLineRow(Row row, long id, String name, String 
path,
+                                         long offset, long length, long ts) {
+    assertEquals(id, row.getField(0));
+    assertEquals(name, row.getField(1));
+    Row blob = (Row) row.getField(2);
+    assertNotNull(blob, "blob struct must be populated");
+    assertEquals("OUT_OF_LINE", blob.getField(0));
+    assertNull(blob.getField(1), "inline data must be null for OUT_OF_LINE 
blob");
+    Row reference = (Row) blob.getField(2);
+    assertNotNull(reference, "reference must be populated for OUT_OF_LINE 
blob");
+    assertEquals(path, reference.getField(0));
+    assertEquals(offset, reference.getField(1));
+    assertEquals(length, reference.getField(2));
+    assertEquals(false, reference.getField(3));
+    assertEquals(ts, row.getField(3));
+  }
+
+  private static void assertBlobTypePreserved(String tablePath) throws 
Exception {
+    HoodieTableMetaClient metaClient =
+        StreamerUtil.createMetaClient(tablePath, new 
org.apache.hadoop.conf.Configuration());
+    HoodieSchema tableSchema = new 
TableSchemaResolver(metaClient).getTableSchema();
+    HoodieSchemaField blobField = tableSchema.getField(BLOB_COLUMN)
+        .orElseThrow(() -> new AssertionError("blob_col field missing from 
table schema"));
+    assertTrue(blobField.schema().isBlobField(),
+        "blob_col must keep the BLOB logical type, found: " + 
blobField.schema());
+  }
+
+  private static TableEnvironment batchEnv() {
+    TableEnvironment tableEnv = 
org.apache.hudi.utils.TestTableEnvs.getBatchTableEnv();
+    tableEnv.getConfig().getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
1);
+    return tableEnv;
+  }
+
+  private static void execInsert(TableEnvironment tableEnv, String insertSql) 
throws Exception {
+    TableResult result = tableEnv.executeSql(insertSql);
+    result.getJobClient().get().getJobExecutionResult().get(120, 
TimeUnit.SECONDS);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
index 185c5830616c..d7c68b7e318d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -18,15 +18,22 @@
 
 package org.apache.hudi.utils;
 
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.RowDataToAvroConverters;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonToRowDataConverters;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.Assertions;
@@ -36,6 +43,7 @@ import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
 
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
@@ -81,4 +89,116 @@ class TestRowDataToAvroConverters {
     Assertions.assertEquals("2021-03-30 08:44:29", 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("UTC+1"))));
     Assertions.assertEquals("2021-03-30 15:44:29", 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("Asia/Shanghai"))));
   }
+
+  @Test
+  void testRowDataToAvroBlobTypeFieldWritesEnumSymbol() {
+    // Flink models the BLOB `type` discriminator as STRING, but its Avro 
encoding is an ENUM
+    // (blob_storage_type). The converter must emit a GenericData.EnumSymbol, 
not a plain Utf8,
+    // otherwise Avro log-block writes (MOR) fail with "value OUT_OF_LINE (a 
Utf8) is not a
+    // blob_storage_type".
+    DataType blobRow = DataTypes.ROW(
+        DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().notNull()),
+        DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD, 
DataTypes.BYTES().nullable()),
+        DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, 
DataTypes.STRING().notNull()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, 
DataTypes.BIGINT().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, 
DataTypes.BIGINT().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, 
DataTypes.BOOLEAN().notNull())
+        ).nullable()));
+    RowType rowType = (RowType) DataTypes.ROW(DataTypes.FIELD("blob_col", 
blobRow)).getLogicalType();
+
+    GenericRowData reference = new GenericRowData(4);
+    reference.setField(0, StringData.fromString("file1.bin"));
+    reference.setField(1, 0L);
+    reference.setField(2, 100L);
+    reference.setField(3, false);
+
+    GenericRowData blob = new GenericRowData(3);
+    blob.setField(0, StringData.fromString(HoodieSchema.Blob.OUT_OF_LINE));
+    blob.setField(1, null);
+    blob.setField(2, reference);
+
+    GenericRowData top = new GenericRowData(1);
+    top.setField(0, blob);
+
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+        RowDataToAvroConverters.createConverter(rowType);
+    GenericRecord avroRecord =
+        (GenericRecord) 
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), top);
+
+    GenericRecord blobRecord = (GenericRecord) avroRecord.get(0);
+    Object typeValue = blobRecord.get(HoodieSchema.Blob.TYPE);
+    Assertions.assertInstanceOf(GenericData.EnumSymbol.class, typeValue,
+        "BLOB `type` must be written as an Avro EnumSymbol, found: "
+            + (typeValue == null ? "null" : typeValue.getClass().getName()));
+    Assertions.assertEquals(HoodieSchema.Blob.OUT_OF_LINE, 
typeValue.toString());
+  }
+
+  /**
+   * A ROW whose field names match the BLOB structure but whose {@link 
HoodieSchema} carries a
+   * plain {@code STRING} (not {@code ENUM}) for the {@code type} field must 
write a plain
+   * {@link Utf8}, not a {@link GenericData.EnumSymbol}.
+   */
+  @Test
+  void testBlobShapedRowWithPlainStringSchemaWritesUtf8() {
+    DataType blobShapedRow = DataTypes.ROW(
+        DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().notNull()),
+        DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD, 
DataTypes.BYTES().nullable()),
+        DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, 
DataTypes.STRING().notNull()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, 
DataTypes.BIGINT().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, 
DataTypes.BIGINT().nullable()),
+            DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, 
DataTypes.BOOLEAN().notNull())
+        ).nullable()));
+    RowType outerRowType = (RowType) DataTypes.ROW(
+        DataTypes.FIELD("blob_col", blobShapedRow)).getLogicalType();
+
+    // Plain RECORD schema: field[0] is STRING (not ENUM) — mimics a non-BLOB 
record whose
+    // shape happens to match the BLOB structure.
+    HoodieSchema refSchema = HoodieSchema.createRecord("reference", null, 
null, Arrays.asList(
+        HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
+            HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
+            HoodieSchema.createNullable(HoodieSchemaType.LONG)),
+        HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
+            HoodieSchema.createNullable(HoodieSchemaType.LONG)),
+        HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
+            HoodieSchema.create(HoodieSchemaType.BOOLEAN))
+    ));
+    HoodieSchema plainBlobShapedSchema = HoodieSchema.createRecord("blob_col", 
null, null, Arrays.asList(
+        HoodieSchemaField.of(HoodieSchema.Blob.TYPE, 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of(HoodieSchema.Blob.INLINE_DATA_FIELD,
+            HoodieSchema.createNullable(HoodieSchemaType.BYTES)),
+        HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE,
+            HoodieSchema.createNullable(refSchema))
+    ));
+    HoodieSchema outerSchema = HoodieSchema.createRecord("outer", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("blob_col", plainBlobShapedSchema)
+    ));
+
+    GenericRowData reference = new GenericRowData(4);
+    reference.setField(0, StringData.fromString("file1.bin"));
+    reference.setField(1, 0L);
+    reference.setField(2, 100L);
+    reference.setField(3, false);
+
+    GenericRowData blobRow = new GenericRowData(3);
+    blobRow.setField(0, StringData.fromString("OUT_OF_LINE"));
+    blobRow.setField(1, null);
+    blobRow.setField(2, reference);
+
+    GenericRowData top = new GenericRowData(1);
+    top.setField(0, blobRow);
+
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+        RowDataToAvroConverters.createConverter(outerRowType);
+    GenericRecord avroRecord = (GenericRecord) converter.convert(outerSchema, 
top);
+
+    GenericRecord blobRecord = (GenericRecord) avroRecord.get(0);
+    Object typeValue = blobRecord.get(HoodieSchema.Blob.TYPE);
+    Assertions.assertInstanceOf(Utf8.class, typeValue,
+        "STRING field must write as Utf8 (not EnumSymbol) when HoodieSchema is 
not ENUM; found: "
+            + (typeValue == null ? "null" : typeValue.getClass().getName()));
+    Assertions.assertEquals("OUT_OF_LINE", typeValue.toString());
+  }
 }
\ No newline at end of file

Reply via email to