This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 314ffaf06e59 feat: add lance format support for Flink COW table 
(#18862)
314ffaf06e59 is described below

commit 314ffaf06e5916d0f548d130ca230c3baf2cf2ba
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jun 1 17:02:53 2026 +0800

    feat: add lance format support for Flink COW table (#18862)
---
 .../io/storage/row/HoodieRowDataLanceWriter.java   |  2 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  | 25 ++++++++--------------
 .../table/format/HoodieRowDataLanceReader.java     | 19 +++++++++++++++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 21 ++++++++++++++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 25 ++++++++--------------
 5 files changed, 58 insertions(+), 34 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
index f4388c23fc43..6381dba3005f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
 import java.util.function.Function;
 
 /**
- * Lance writer for Flink {@link RowData} append-only base files.
+ * Lance writer for Flink {@link RowData} base files.
  */
 public class HoodieRowDataLanceWriter extends HoodieBaseLanceWriter<RowData, 
String>
     implements HoodieRowDataFileWriter {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index ae8dbb1e963b..6241551e79a2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -89,7 +89,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     setupTableOptions(conf.get(FlinkOptions.PATH), conf);
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
-    checkBaseFileFormatForRead(conf, schema);
+    checkBaseFileFormatForRead(conf);
     return new HoodieTableSource(
         SerializableSchema.create(schema),
         path,
@@ -174,7 +174,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
     checkTableType(conf);
-    checkBaseFileFormatForWrite(conf, schema);
+    checkBaseFileFormatForWrite(conf);
     checkIndexType(conf);
 
     if (!OptionsResolver.isAppendMode(conf)) {
@@ -216,29 +216,22 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   }
 
   /**
-   * Validate the base file format. Flink Lance support is scoped to 
append-only COW tables.
+   * Validate the base file format. Flink Lance support is scoped to COW 
tables.
    */
-  private void checkBaseFileFormatForRead(Configuration conf, ResolvedSchema 
schema) {
-    checkLanceBaseFileFormat(conf, schema);
+  private void checkBaseFileFormatForRead(Configuration conf) {
+    checkLanceBaseFileFormat(conf);
   }
 
-  private void checkBaseFileFormatForWrite(Configuration conf, ResolvedSchema 
schema) {
-    checkLanceBaseFileFormat(conf, schema);
-    if (isLanceBaseFileFormat(conf) && !OptionsResolver.isAppendMode(conf)) {
-      throw new HoodieValidationException("Flink Lance base-file writes 
require append-only INSERT mode. Set '"
-          + FlinkOptions.OPERATION.key() + "' = 'insert'.");
-    }
+  private void checkBaseFileFormatForWrite(Configuration conf) {
+    checkLanceBaseFileFormat(conf);
   }
 
-  private void checkLanceBaseFileFormat(Configuration conf, ResolvedSchema 
schema) {
+  private void checkLanceBaseFileFormat(Configuration conf) {
     if (!isLanceBaseFileFormat(conf)) {
       return;
     }
-    if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key()) || 
schema.getPrimaryKey().isPresent()) {
-      throw new HoodieValidationException("Flink Lance base-file support is 
only available for append-only tables without primary keys.");
-    }
     if (OptionsResolver.isMorTable(conf)) {
-      throw new HoodieValidationException("Flink Lance base-file support is 
only available for COPY_ON_WRITE append-only tables.");
+      throw new HoodieValidationException("Flink Lance base-file support is 
only available for COPY_ON_WRITE tables.");
     }
     if (OptionsResolver.isSchemaEvolutionEnabled(conf)) {
       throw new HoodieValidationException("Flink Lance base-file support does 
not support schema evolution. Set '"
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
index 8c7564af6730..eba397093ec5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
@@ -52,6 +52,7 @@ import org.lance.file.LanceFileReader;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -119,7 +120,23 @@ public class HoodieRowDataLanceReader implements 
HoodieFileReader<RowData> {
 
   @Override
   public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
-    throw new HoodieException("Filtering row keys from Lance files is not 
supported for Flink append-only tables without primary keys: " + path);
+    Set<Pair<String, Long>> result = new HashSet<>();
+    long position = 0;
+    boolean includeAllKeys = candidateRowKeys == null || 
candidateRowKeys.isEmpty();
+
+    try (ClosableIterator<String> keyIterator = getRecordKeyIterator()) {
+      while (keyIterator.hasNext()) {
+        String recordKey = keyIterator.next();
+        if (includeAllKeys || candidateRowKeys.contains(recordKey)) {
+          result.add(Pair.of(recordKey, position));
+        }
+        position++;
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to filter row keys from Lance file: 
" + path, e);
+    }
+
+    return result;
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 1cc5beaad418..168c1b893beb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1381,6 +1381,27 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(projectedRows, "[+I[Alice, id1], +I[Bob, id2]]");
   }
 
+  @Test
+  void testLanceFormatCopyOnWriteUpsertWriteAndRead() {
+    String createHoodieTable = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE)
+        .option("hoodie.table.base.file.format", "LANCE")
+        .end();
+    batchTableEnv.executeSql(createHoodieTable);
+
+    execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+    List<Row> rows = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select * from t1").collect());
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+
+    execInsertSql(batchTableEnv, TestSQL.UPDATE_INSERT_T1);
+    List<Row> updatedRows = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select * from t1").collect());
+    assertRowsEquals(updatedRows, TestData.DATA_SET_SOURCE_MERGED);
+  }
+
   @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 6aa6f20581c3..73a032bf4313 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -788,7 +788,7 @@ public class TestHoodieTableFactory {
   }
 
   @Test
-  void testLanceFormatSupportedForAppendOnlyTables() {
+  void testLanceFormatSupportedForCopyOnWriteTables() {
     Configuration lanceConf = new Configuration();
     lanceConf.set(FlinkOptions.PATH, new File(tempFile, 
"lance").getAbsolutePath());
     lanceConf.set(FlinkOptions.TABLE_NAME, "lance_t1");
@@ -809,15 +809,7 @@ public class TestHoodieTableFactory {
     final MockContext morContext = MockContext.getInstance(morConf, 
appendOnlySchema, "f2");
     HoodieValidationException morEx = 
assertThrows(HoodieValidationException.class,
         () -> new HoodieTableFactory().createDynamicTableSink(morContext));
-    assertThat(morEx.getMessage(), is("Flink Lance base-file support is only 
available for COPY_ON_WRITE append-only tables."));
-
-    Configuration upsertConf = new Configuration(lanceConf);
-    upsertConf.set(FlinkOptions.OPERATION, "upsert");
-    final MockContext upsertContext = MockContext.getInstance(upsertConf, 
appendOnlySchema, "f2");
-    HoodieValidationException operationEx = 
assertThrows(HoodieValidationException.class,
-        () -> new HoodieTableFactory().createDynamicTableSink(upsertContext));
-    assertThat(operationEx.getMessage(), is("Flink Lance base-file writes 
require append-only INSERT mode. Set '"
-        + FlinkOptions.OPERATION.key() + "' = 'insert'."));
+    assertThat(morEx.getMessage(), is("Flink Lance base-file support is only 
available for COPY_ON_WRITE tables."));
 
     Configuration schemaEvolutionConf = new Configuration(lanceConf);
     
schemaEvolutionConf.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
"true");
@@ -833,15 +825,16 @@ public class TestHoodieTableFactory {
         .primaryKey("f0")
         .build();
     final MockContext primaryKeyContext = MockContext.getInstance(lanceConf, 
primaryKeySchema, "f1");
-    HoodieValidationException primaryKeyEx = 
assertThrows(HoodieValidationException.class,
-        () -> new 
HoodieTableFactory().createDynamicTableSink(primaryKeyContext));
-    assertThat(primaryKeyEx.getMessage(), is("Flink Lance base-file support is 
only available for append-only tables without primary keys."));
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(primaryKeyContext));
+
+    Configuration upsertConf = new Configuration(lanceConf);
+    upsertConf.set(FlinkOptions.OPERATION, "upsert");
+    final MockContext upsertContext = MockContext.getInstance(upsertConf, 
primaryKeySchema, "f1");
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(upsertContext));
 
     lanceConf.set(FlinkOptions.RECORD_KEY_FIELD, "f0");
     final MockContext keyedContext = MockContext.getInstance(lanceConf, 
appendOnlySchema, "f2");
-    HoodieValidationException sinkEx = 
assertThrows(HoodieValidationException.class,
-        () -> new HoodieTableFactory().createDynamicTableSink(keyedContext));
-    assertThat(sinkEx.getMessage(), is("Flink Lance base-file support is only 
available for append-only tables without primary keys."));
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(keyedContext));
   }
 
   // -------------------------------------------------------------------------

Reply via email to