This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a5244814b2d feat: Support COW bulk-insert, insert, upsert, delete 
works with spark datasource and lance (#17731)
2a5244814b2d is described below

commit 2a5244814b2da5a201b16dd889561a8b0dbf8005
Author: Rahil C <[email protected]>
AuthorDate: Wed Dec 31 10:00:01 2025 -0500

    feat: Support COW bulk-insert, insert, upsert, delete works with spark 
datasource and lance (#17731)
    
    * feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter
    
    * feat: Add HoodieSparkLanceReader for reading lance files to internal row
    
    * migrate to hoodie schema and address tim prev comments
    
    * Implement SparkColumnarFileReader for Datasource integration with Lance
    
    * fix usages to hoodie schema
    
    * fix iterator for reuse
    
    * minor fixes
    
    * add DisabledIfSystemProperty
    
    * try spark 4.0
    
    * scala style plugin property change
    
    * intial minor comments
    
    * add spark 3.4
    
    * address comments
    
    * address ethan tim comments
    
    * scalastyle
    
    * Support COW bulk-insert, insert, upsert, delete works with spark 
datasource and lance
    
    * get tests passing
    
    * remove HoodieInternalRowLanceWriter
    
    * fix compilation, address comments
    
    * address copy()
    
    * address tim comments
    
    * remove older comment
    
    * retrigger ci
    
    * address tim comment
---
 .../client/common/SparkReaderContextFactory.java   |   3 +
 .../io/storage/HoodieSparkFileReaderFactory.java   |   5 +
 .../io/storage/HoodieSparkFileWriterFactory.java   |   1 -
 .../hudi/io/storage/HoodieSparkLanceReader.java    |   5 -
 .../hudi/io/storage/HoodieSparkLanceWriter.java    |  35 +-
 .../hudi/io/storage/LanceRecordIterator.java       |   2 +-
 .../row/HoodieInternalRowFileWriterFactory.java    |  16 +
 .../org/apache/hudi/common/util/LanceUtils.java    | 204 +++++++
 .../hudi/io/storage/HoodieFileReaderFactory.java   |  12 +
 .../apache/hudi/io/storage/HoodieIOFactory.java    |   2 +
 .../io/storage/hadoop/HoodieHadoopIOFactory.java   |   3 +
 .../hudi/functional/TestLanceDataSource.scala      | 620 ++++++++++++++++++++-
 12 files changed, 897 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index d2f5eb81458f..a4e88b06915f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -100,6 +100,9 @@ public class SparkReaderContextFactory implements 
ReaderContextFactory<InternalR
     } else if (metaClient.getTableConfig().getBaseFileFormat() == 
HoodieFileFormat.ORC) {
       SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver, 
sqlConf, options, configs, sparkAdapter);
       baseFileReaderBroadcast = jsc.broadcast(orcFileReader);
+    } else if (metaClient.getTableConfig().getBaseFileFormat() == 
HoodieFileFormat.LANCE) {
+      baseFileReaderBroadcast = jsc.broadcast(
+          sparkAdapter.createLanceFileReader(false, sqlConf, options, 
configs));
     } else {
       baseFileReaderBroadcast = jsc.broadcast(
           sparkAdapter.createParquetFileReader(false, sqlConf, options, 
configs));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
index 1a68c3d0edbc..ec71460ea313 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
@@ -48,6 +48,11 @@ public class HoodieSparkFileReaderFactory extends 
HoodieFileReaderFactory {
     return new HoodieSparkParquetReader(storage, path);
   }
 
+  @Override
+  protected HoodieFileReader newLanceFileReader(StoragePath path) {
+    return new HoodieSparkLanceReader(path);
+  }
+
   @Override
   protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig,
                                                 StoragePath path,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index c251eae33e2e..bd4ba955d04a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -57,7 +57,6 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
     if (compressionCodecName.isEmpty()) {
       compressionCodecName = null;
     }
-    //TODO boundary to revisit in follow up to use HoodieSchema directly
     HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storage.getConf(), schema,
         config, enableBloomFilter(populateMetaFields, config));
     HoodieRowParquetConfig parquetConfig = new 
HoodieRowParquetConfig(writeSupport,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index 90e4996e44a4..b7794f3aef9f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -184,9 +184,4 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
       throw new HoodieException("Failed to get row count from Lance file: " + 
path, e);
     }
   }
-
-  /**
-   * Iterator implementation that reads Lance file batches and converts to 
UnsafeRows.
-   * Keeps ColumnarBatch alive while iterating to avoid unnecessary data 
copying.
-   */
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index e7476f4473fa..67e07d162ec1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -42,14 +43,15 @@ import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PART
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
 
 /**
- * Spark Lance file writer implementing {@link HoodieSparkFileWriter}.
+ * Spark Lance file writer implementing {@link HoodieSparkFileWriter} and 
{@link HoodieInternalRowFileWriter}.
  *
  * This writer integrates with Hudi's storage I/O layer and supports:
  * - Hudi metadata field population
  * - Record key tracking (for bloom filters - TODO 
https://github.com/apache/hudi/issues/17664)
  * - Sequence ID generation
  */
-public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow> 
implements HoodieSparkFileWriter {
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
+    implements HoodieSparkFileWriter, HoodieInternalRowFileWriter {
 
   private static final String DEFAULT_TIMEZONE = "UTC";
 
@@ -90,6 +92,22 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow> i
     };
   }
 
+  /**
+   * Constructor for Spark Lance writer used for internal row writing with 
pre-embedded metadata.
+   *
+   * @param file Path where Lance file will be written
+   * @param sparkSchema Spark schema for the data
+   * @param taskContextSupplier Task context supplier for partition ID
+   * @param storage HoodieStorage instance
+   * @throws IOException if writer initialization fails
+   */
+  public HoodieSparkLanceWriter(StoragePath file,
+                                StructType sparkSchema,
+                                TaskContextSupplier taskContextSupplier,
+                                HoodieStorage storage) throws IOException {
+    this(file, sparkSchema, null, taskContextSupplier, storage, false);
+  }
+
   @Override
   public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws 
IOException {
     if (populateMetaFields) {
@@ -105,6 +123,17 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow> i
   public void writeRow(String recordKey, InternalRow row) throws IOException {
     super.write(row);
   }
+  
+  @Override
+  public void writeRow(UTF8String key, InternalRow row) throws IOException {
+    // Key reserved for future bloom filter support 
(https://github.com/apache/hudi/issues/17664)
+    super.write(row);
+  }
+  
+  @Override
+  public void writeRow(InternalRow row) throws IOException {
+    super.write(row);
+  }
 
   @Override
   protected void populateVectorSchemaRoot(List<InternalRow> records) {
@@ -154,4 +183,4 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow> i
     row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), 
UTF8String.fromString(partitionPath));
     row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
index 88d5645879a6..d27668eb2b81 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
@@ -129,7 +129,7 @@ public class LanceRecordIterator implements 
ClosableIterator<UnsafeRow> {
     }
     InternalRow row = rowIterator.next();
     // Convert to UnsafeRow immediately while batch is still open
-    return projection.apply(row);
+    return projection.apply(row).copy();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index d51b48f3cd55..b1431ae70d80 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -21,9 +21,11 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.config.HoodieParquetConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieSparkLanceWriter;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
@@ -33,6 +35,7 @@ import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
 
@@ -59,6 +62,8 @@ public class HoodieInternalRowFileWriterFactory {
     final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
       return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig, 
schema, tryInstantiateBloomFilter(writeConfig));
+    } else if (LANCE.getFileExtension().equals(extension)) {
+      return newLanceInternalRowFileWriter(path, hoodieTable, schema);
     }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
@@ -87,6 +92,17 @@ public class HoodieInternalRowFileWriterFactory {
         ));
   }
 
+  private static HoodieInternalRowFileWriter 
newLanceInternalRowFileWriter(StoragePath path,
+                                                                           
HoodieTable table,
+                                                                           
StructType structType)
+      throws IOException {
+    return new HoodieSparkLanceWriter(
+        path,
+        structType,
+        new LocalTaskContextSupplier(),
+        table.getStorage());
+  }
+
   private static Option<BloomFilter> 
tryInstantiateBloomFilter(HoodieWriteConfig writeConfig) {
     // NOTE: Currently Bloom Filter is only going to be populated if 
meta-fields are populated
     if (writeConfig.populateMetaFields()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
new file mode 100644
index 000000000000..c0bf94f07489
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LanceUtils extends FileFormatUtils {
+
+  @Override
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+    return fetchRecordKeysWithPositions(storage, filePath, Option.empty(), 
Option.empty());
+  }
+
+  @Override
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage,
+                                                                               
StoragePath filePath,
+                                                                               
Option<BaseKeyGenerator> keyGeneratorOpt,
+                                                                               
Option<String> partitionPath) {
+    AtomicLong position = new AtomicLong(0);
+    return new CloseableMappingIterator<>(
+        getHoodieKeyIterator(storage, filePath, keyGeneratorOpt, 
partitionPath),
+        key -> Pair.of(key, position.getAndIncrement()));
+  }
+
+  @Override
+  public ClosableIterator<HoodieKey> getHoodieKeyIterator(HoodieStorage 
storage, StoragePath filePath) {
+    return getHoodieKeyIterator(storage, filePath, Option.empty(), 
Option.empty());
+  }
+
+  @Override
+  public ClosableIterator<HoodieKey> getHoodieKeyIterator(HoodieStorage 
storage,
+                                                           StoragePath 
filePath,
+                                                           
Option<BaseKeyGenerator> keyGeneratorOpt,
+                                                           Option<String> 
partitionPath) {
+    try {
+      HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage)
+          .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+          .getFileReader(new HoodieReaderConfig(), filePath, 
HoodieFileFormat.LANCE);
+      ClosableIterator<String> keyIterator = reader.getRecordKeyIterator();
+      return new ClosableIterator<HoodieKey>() {
+        @Override
+        public void close() {
+          keyIterator.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+          return keyIterator.hasNext();
+        }
+
+        @Override
+        public HoodieKey next() {
+          String key = keyIterator.next();
+          return new HoodieKey(key, partitionPath.orElse(null));
+        }
+      };
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read from Lance file" + filePath, 
e);
+    }
+  }
+
+  @Override
+  public Schema readAvroSchema(HoodieStorage storage, StoragePath filePath) {
+    try (HoodieFileReader fileReader =
+                 HoodieIOFactory.getIOFactory(storage)
+                         .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+                         .getFileReader(
+                                 ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+                                 filePath,
+                                 HoodieFileFormat.LANCE)) {
+      return fileReader.getSchema().getAvroSchema();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read schema from Lance file", e);
+    }
+  }
+
+  @Override
+  public HoodieFileFormat getFormat() {
+    return HoodieFileFormat.LANCE;
+  }
+
+  @Override
+  public List<GenericRecord> readAvroRecords(HoodieStorage storage, 
StoragePath filePath) {
+    throw new UnsupportedOperationException("readAvroRecords is not yet 
supported for Lance format");
+  }
+
+  @Override
+  public List<GenericRecord> readAvroRecords(HoodieStorage storage, 
StoragePath filePath, Schema schema) {
+    throw new UnsupportedOperationException("readAvroRecords with schema is 
not yet supported for Lance format");
+  }
+
+  @Override
+  public Map<String, String> readFooter(HoodieStorage storage, boolean 
required, StoragePath filePath, String... footerNames) {
+    throw new UnsupportedOperationException("readFooter is not yet supported 
for Lance format");
+  }
+
+  @Override
+  public long getRowCount(HoodieStorage storage, StoragePath filePath) {
+    try (HoodieFileReader fileReader =
+                 HoodieIOFactory.getIOFactory(storage)
+                         .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+                         .getFileReader(
+                                 ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+                                 filePath,
+                                 HoodieFileFormat.LANCE)) {
+      return fileReader.getTotalRecords();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read schema from Lance file", e);
+    }
+  }
+
+  @Override
+  public Set<Pair<String, Long>> filterRowKeys(HoodieStorage storage, 
StoragePath filePath, Set<String> filterKeys) {
+    try (HoodieFileReader fileReader =
+                 HoodieIOFactory.getIOFactory(storage)
+                         .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+                         .getFileReader(
+                                 ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+                                 filePath,
+                                 HoodieFileFormat.LANCE)) {
+      return fileReader.filterRowKeys(filterKeys);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read filter keys from Lance 
file", e);
+    }
+  }
+
+  @Override
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readColumnStatsFromMetadata(HoodieStorage storage,
+                                                                               
   StoragePath filePath,
+                                                                               
   List<String> columnList,
+                                                                               
   HoodieIndexVersion indexVersion) {
+    throw new UnsupportedOperationException("readColumnStatsFromMetadata is 
not yet supported for Lance format");
+  }
+
+  @Override
+  public void writeMetaFile(HoodieStorage storage, StoragePath filePath, 
Properties props) throws IOException {
+    throw new UnsupportedOperationException("writeMetaFile is not yet 
supported for Lance format");
+  }
+
+  @Override
+  public ByteArrayOutputStream serializeRecordsToLogBlock(HoodieStorage 
storage,
+                                                          List<HoodieRecord> 
records,
+                                                          HoodieSchema 
writerSchema,
+                                                          HoodieSchema 
readerSchema,
+                                                          String keyFieldName,
+                                                          Map<String, String> 
paramsMap) throws IOException {
+    throw new UnsupportedOperationException("serializeRecordsToLogBlock is not 
yet supported for Lance format");
+  }
+
+  @Override
+  public Pair<ByteArrayOutputStream, Object> 
serializeRecordsToLogBlock(HoodieStorage storage,
+                                                                        
Iterator<HoodieRecord> records,
+                                                                        
HoodieRecord.HoodieRecordType recordType,
+                                                                        Schema 
writerSchema,
+                                                                        Schema 
readerSchema,
+                                                                        String 
keyFieldName,
+                                                                        
Map<String, String> paramsMap) throws IOException {
+    throw new UnsupportedOperationException("serializeRecordsToLogBlock with 
iterator is not yet supported for Lance format");
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 341f20d7d700..4f0d1d2adf16 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.hudi.storage.StoragePathInfo;
 import java.io.IOException;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 
@@ -54,6 +55,9 @@ public class HoodieFileReaderFactory {
     if (ORC.getFileExtension().equals(extension)) {
       return getFileReader(hoodieConfig, path, ORC, Option.empty());
     }
+    if (LANCE.getFileExtension().equals(extension)) {
+      return getFileReader(hoodieConfig, path, LANCE, Option.empty());
+    }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
 
@@ -71,6 +75,8 @@ public class HoodieFileReaderFactory {
         return newHFileFileReader(hoodieConfig, path, schemaOption);
       case ORC:
         return newOrcFileReader(path);
+      case LANCE:
+        return newLanceFileReader(path);
       default:
         throw new UnsupportedOperationException(format + " format not 
supported yet.");
     }
@@ -85,6 +91,8 @@ public class HoodieFileReaderFactory {
         return newHFileFileReader(hoodieConfig, pathInfo, schemaOption);
       case ORC:
         return newOrcFileReader(pathInfo.getPath());
+      case LANCE:
+        return newLanceFileReader(pathInfo.getPath());
       default:
         throw new UnsupportedOperationException(format + " format not 
supported yet.");
     }
@@ -125,6 +133,10 @@ public class HoodieFileReaderFactory {
     throw new UnsupportedOperationException();
   }
 
+  protected HoodieFileReader newLanceFileReader(StoragePath path) {
+    throw new UnsupportedOperationException();
+  }
+
   public HoodieFileReader newBootstrapFileReader(HoodieFileReader 
skeletonFileReader,
                                                  HoodieFileReader 
dataFileReader,
                                                  Option<String[]> 
partitionFields,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
index e1cff2a0424e..d21151bf639c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -112,6 +112,8 @@ public abstract class HoodieIOFactory {
       return getFileFormatUtils(HoodieFileFormat.ORC);
     } else if 
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
       return getFileFormatUtils(HoodieFileFormat.HFILE);
+    } else if 
(path.getFileExtension().equals(HoodieFileFormat.LANCE.getFileExtension())) {
+      return getFileFormatUtils(HoodieFileFormat.LANCE);
     }
     throw new UnsupportedOperationException("The format for file " + path + " 
is not supported yet.");
   }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
index 3d8e797682e9..6cf94f16fe14 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.HFileUtils;
+import org.apache.hudi.common.util.LanceUtils;
 import org.apache.hudi.common.util.OrcUtils;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -111,6 +112,8 @@ public class HoodieHadoopIOFactory extends HoodieIOFactory {
         return new OrcUtils();
       case HFILE:
         return new HFileUtils();
+      case LANCE:
+        return new LanceUtils();
       default:
         throw new UnsupportedOperationException(fileFormat.name() + " format 
not supported yet.");
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 32e7d869c6e4..9a634b8eec6d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -19,7 +19,8 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.DefaultSparkRecordMerger
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 
@@ -28,6 +29,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty
 
+import scala.collection.JavaConverters._
+
 /**
  * Basic functional tests for Lance file format with Hudi Spark datasource.
  */
@@ -129,4 +132,619 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
+
+  @Test
+  def testWhereClauseFiltering(): Unit = {
+    val tableName = "test_lance_where"
+    val tablePath = s"$basePath/$tableName"
+
+    // Create test data
+    val records = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4)
+    )
+    val df = spark.createDataFrame(records).toDF("id", "name", "age", "score")
+
+    // Write to Hudi table with Lance format
+    df.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Test 1: Simple WHERE clause on numeric column
+    val filteredByAge = spark.read
+      .format("hudi")
+      .load(tablePath)
+      .where("age > 30")
+      .select("id", "name", "age", "score")
+
+    val expectedFilteredByAge = spark.createDataFrame(Seq(
+      (3, "Charlie", 35, 92.1),
+      (5, "Eve", 32, 91.4)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedFilteredByAge.except(filteredByAge).isEmpty)
+    assertTrue(filteredByAge.except(expectedFilteredByAge).isEmpty)
+
+    // Test 2: WHERE clause on string column
+    val filteredByName = spark.read
+      .format("hudi")
+      .load(tablePath)
+      .where("name = 'Bob'")
+      .select("id", "name", "age", "score")
+
+    val expectedFilteredByName = spark.createDataFrame(Seq(
+      (2, "Bob", 25, 87.3)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedFilteredByName.except(filteredByName).isEmpty)
+    assertTrue(filteredByName.except(expectedFilteredByName).isEmpty)
+
+    // Test 3: Complex WHERE with multiple conditions
+    val filteredComplex = spark.read
+      .format("hudi")
+      .load(tablePath)
+      .where("age >= 28 AND score > 90")
+      .select("id", "name", "age", "score")
+
+    val expectedFilteredComplex = spark.createDataFrame(Seq(
+      (1, "Alice", 30, 95.5),
+      (3, "Charlie", 35, 92.1),
+      (5, "Eve", 32, 91.4)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedFilteredComplex.except(filteredComplex).isEmpty)
+    assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty)
+  }
+
+  @Test
+  def testMultipleBulkInsertsWithCommitValidation(): Unit = {
+    val tableName = "test_lance_multiple_inserts"
+    val tablePath = s"$basePath/$tableName"
+
+    // First insert - records 1-3
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1)
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+
+    df1.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Second insert - records 4-6
+    val records2 = Seq(
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7)
+    )
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+
+    df2.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Third insert - records 7-9
+    val records3 = Seq(
+      (7, "Grace", 29, 93.2),
+      (8, "Henry", 31, 89.6),
+      (9, "Iris", 26, 94.8)
+    )
+    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+
+    df3.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Validate number of commits matches number of inserts
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+
+    val commitCount = 
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+    assertEquals(3, commitCount, "Should have 3 completed commits (one per 
insert)")
+
+    // Verify that all commits are bulk_insert commits
+    val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+    assertEquals(3, commits.size, "Should have exactly 3 commits")
+
+    // Check that each commit is a COMMIT action (bulk_insert creates COMMIT 
actions)
+    commits.foreach { instant =>
+      assertEquals("commit", instant.getAction, s"Instant 
${instant.requestedTime()} should be a commit action")
+    }
+
+    // Read back all data and verify total record count
+    val readDf = spark.read
+      .format("hudi")
+      .load(tablePath)
+
+    val actual = readDf.select("id", "name", "age", "score")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7),
+      (7, "Grace", 29, 93.2),
+      (8, "Henry", 31, 89.6),
+      (9, "Iris", 26, 94.8)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  @Test
+  def testTimeTravel(): Unit = {
+    val tableName = "test_lance_time_travel"
+    val tablePath = s"$basePath/$tableName"
+
+    // First insert - records 1-3
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1)
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+
+    df1.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Second insert - records 4-6
+    val records2 = Seq(
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7)
+    )
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+
+    df2.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Get the commit timestamp after second insert
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+
+    val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+    assertEquals(2, commits.size, "Should have 2 commits after second insert")
+    val secondCommitTime = commits(1).requestedTime()
+
+    // Third insert - records 7-9
+    val records3 = Seq(
+      (7, "Grace", 29, 93.2),
+      (8, "Henry", 31, 89.6),
+      (9, "Iris", 26, 94.8)
+    )
+    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+
+    df3.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Time travel query to second commit (should see data from c1 + c2 only)
+    val timeTravelDf = spark.read
+      .format("hudi")
+      .option("as.of.instant", secondCommitTime)
+      .load(tablePath)
+
+    val actual = timeTravelDf.select("id", "name", "age", "score")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  @Test
+  def testMultipleRegularInsertsWithCommitValidation(): Unit = {
+    val tableName = "test_lance_regular_inserts"
+    val tablePath = s"$basePath/$tableName"
+
+    // First insert - records 1-3 using regular insert
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1)
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+
+    df1.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "insert")
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Second insert - records 4-6 using regular insert
+    val records2 = Seq(
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7)
+    )
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+
+    df2.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Validate number of commits matches number of inserts
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+
+    val commitCount = 
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+    assertEquals(2, commitCount, "Should have 2 completed commits (one per 
insert)")
+
+    // Verify that all commits are insert commits
+    val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+    assertEquals(2, commits.size, "Should have exactly 2 commits")
+
+    // Check that each commit is a COMMIT action (insert creates COMMIT 
actions)
+    commits.foreach { instant =>
+      assertEquals("commit", instant.getAction, s"Instant 
${instant.requestedTime()} should be a commit action")
+    }
+
+    // Read back all data and verify total record count
+    val readDf = spark.read
+      .format("hudi")
+      .load(tablePath)
+
+    val actual = readDf.select("id", "name", "age", "score")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  @Test
+  def testBasicUpsertModifyExistingRow(): Unit = {
+    val tableName = "test_lance_upsert"
+    val tablePath = s"$basePath/$tableName"
+
+    // Initial insert - 3 records
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1)
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+
+    df1.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "insert")
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Upsert - modify Bob's record (id=2)
+    val records2 = Seq(
+      (2, "Bob", 40, 95.0)  // Update Bob: age 25->40, score 87.3->95.0
+    )
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+
+    df2.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "upsert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Second upsert - modify Alice (id=1) and insert David (id=4)
+    val records3 = Seq(
+      (1, "Alice", 45, 98.5),  // Update Alice: age 30->45, score 95.5->98.5
+      (4, "David", 28, 88.0)   // Insert new record
+    )
+    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+
+    df3.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "upsert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Validate commits
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+
+    val commitCount = 
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+    assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2 
upserts)")
+
+    // Read and verify data
+    val readDf = spark.read.format("hudi").load(tablePath)
+    val actual = readDf.select("id", "name", "age", "score")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice", 45, 98.5),
+      (2, "Bob", 40, 95.0),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.0)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  @Test
+  def testBasicDeleteOperation(): Unit = {
+    val tableName = "test_lance_delete"
+    val tablePath = s"$basePath/$tableName"
+
+    // Initial insert - 5 records
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.0),
+      (5, "Eve", 32, 91.4)
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+
+    df1.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "insert")
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Delete operation - delete Bob (id=2), David (id=4), and a non-existent 
key (id=99)
+    val recordsToDelete = Seq(
+      (2, "Bob", 25, 87.3),        // Delete Bob (exists)
+      (4, "David", 28, 88.0),      // Delete David (exists)
+      (99, "NonExistent", 50, 0.0) // Delete non-existent record (should be 
no-op)
+    )
+    val deleteDF = spark.createDataFrame(recordsToDelete).toDF("id", "name", 
"age", "score")
+
+    deleteDF.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "delete")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Validate commits
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+
+    val commitCount = 
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+    assertEquals(2, commitCount, "Should have 2 completed commits (insert + 
delete)")
+
+    // Read and verify data
+    val readDf = spark.read.format("hudi").load(tablePath)
+    val actual = readDf.select("id", "name", "age", "score")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice", 30, 95.5),
+      (3, "Charlie", 35, 92.1),
+      (5, "Eve", 32, 91.4)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  @Test
+  def testIncrementalQuery(): Unit = {
+    val tableName = "test_lance_incremental"
+    val tablePath = s"$basePath/$tableName"
+
+    // First insert - records 1-3
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1)
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+
+    df1.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Second insert - records 4-6
+    val records2 = Seq(
+      (4, "David", 28, 88.9),
+      (5, "Eve", 32, 91.4),
+      (6, "Frank", 27, 85.7)
+    )
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+
+    df2.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Get commit timestamps
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+
+    val commitsAfterSecond = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+    assertEquals(2, commitsAfterSecond.size, "Should have 2 commits after 
second insert")
+    val secondCommitTime = commitsAfterSecond(1).getCompletionTime
+
+    // Third insert - records 7-9
+    val records3 = Seq(
+      (7, "Grace", 29, 93.2),
+      (8, "Henry", 31, 89.6),
+      (9, "Iris", 26, 94.8)
+    )
+    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+
+    df3.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .option(OPERATION.key(), "bulk_insert")
+      .mode(SaveMode.Append)
+      .save(tablePath)
+
+    // Reload metaClient to get latest commits
+    metaClient.reloadActiveTimeline()
+    val allCommits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+    assertEquals(3, allCommits.size, "Should have 3 commits after third 
insert")
+    val thirdCommitTime = allCommits(2).getCompletionTime
+
+    // Incremental query from c2 to c3 (should see only data from c3)
+    val incrementalDf = spark.read
+      .format("hudi")
+      .option("hoodie.datasource.query.type", "incremental")
+      .option("hoodie.datasource.read.begin.instanttime", secondCommitTime)
+      .option("hoodie.datasource.read.end.instanttime", thirdCommitTime)
+      .load(tablePath)
+
+    val actual = incrementalDf.select("id", "name", "age", "score")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (7, "Grace", 29, 93.2),
+      (8, "Henry", 31, 89.6),
+      (9, "Iris", 26, 94.8)
+    )).toDF("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
 }

Reply via email to