This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 117f8da56fd8 feat: Add HoodieBaseLanceFileWriter and implementation 
for SparkFileWriter (#17629)
117f8da56fd8 is described below

commit 117f8da56fd8b3b135acd2fc5faf00563eff8f9c
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 23 13:36:00 2025 -0500

    feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter 
(#17629)
---
 hudi-client/hudi-spark-client/pom.xml              |   6 +
 .../hudi/io/storage/HoodieSparkLanceWriter.java    | 157 +++++++
 hudi-hadoop-common/pom.xml                         |  14 +
 .../hudi/io/lance/HoodieBaseLanceWriter.java       | 191 +++++++++
 .../io/storage/TestHoodieSparkLanceWriter.java     | 462 +++++++++++++++++++++
 pom.xml                                            |  41 +-
 6 files changed, 870 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-spark-client/pom.xml 
b/hudi-client/hudi-spark-client/pom.xml
index 473d2a718e94..9b557f85b137 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -100,6 +100,12 @@
       <artifactId>parquet-avro</artifactId>
     </dependency>
 
+    <!-- Lance Spark for Arrow conversion -->
+    <dependency>
+      <groupId>com.lancedb</groupId>
+      <artifactId>${lance.spark.artifact}</artifactId>
+    </dependency>
+
     <!-- Used for adding kryo serializers for protobuf -->
     <dependency>
       <groupId>com.twitter</groupId>
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
new file mode 100644
index 000000000000..e7476f4473fa
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.spark.arrow.LanceArrowWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.LanceArrowUtils;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+
+/**
+ * Spark Lance file writer implementing {@link HoodieSparkFileWriter}.
+ *
+ * This writer integrates with Hudi's storage I/O layer and supports:
+ * - Hudi metadata field population
+ * - Record key tracking (for bloom filters - TODO 
https://github.com/apache/hudi/issues/17664)
+ * - Sequence ID generation
+ */
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow> 
implements HoodieSparkFileWriter {
+
+  private static final String DEFAULT_TIMEZONE = "UTC";
+
+  private final StructType sparkSchema;
+  private final Schema arrowSchema;
+  private final UTF8String fileName;
+  private final UTF8String instantTime;
+  private final boolean populateMetaFields;
+  private final Function<Long, String> seqIdGenerator;
+  private LanceArrowWriter writer;
+
+  /**
+   * Constructor for Spark Lance writer.
+   *
+   * @param file Path where Lance file will be written
+   * @param sparkSchema Spark schema for the data
+   * @param instantTime Instant time for the commit
+   * @param taskContextSupplier Task context supplier for partition ID
+   * @param storage HoodieStorage instance
+   * @param populateMetaFields Whether to populate Hudi metadata fields
+   * @throws IOException if writer initialization fails
+   */
+  public HoodieSparkLanceWriter(StoragePath file,
+                                StructType sparkSchema,
+                                String instantTime,
+                                TaskContextSupplier taskContextSupplier,
+                                HoodieStorage storage,
+                                boolean populateMetaFields) throws IOException 
{
+    super(storage, file, DEFAULT_BATCH_SIZE);
+    this.sparkSchema = sparkSchema;
+    this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema, 
DEFAULT_TIMEZONE, true, false);
+    this.fileName = UTF8String.fromString(file.getName());
+    this.instantTime = UTF8String.fromString(instantTime);
+    this.populateMetaFields = populateMetaFields;
+    this.seqIdGenerator = recordIndex -> {
+      Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
+      return HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex);
+    };
+  }
+
+  @Override
+  public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws 
IOException {
+    if (populateMetaFields) {
+      UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+      updateRecordMetadata(row, recordKey, key.getPartitionPath(), 
getWrittenRecordCount());
+      super.write(row);
+    } else {
+      super.write(row);
+    }
+  }
+
+  @Override
+  public void writeRow(String recordKey, InternalRow row) throws IOException {
+    super.write(row);
+  }
+
+  @Override
+  protected void populateVectorSchemaRoot(List<InternalRow> records) {
+    if (writer == null) {
+      writer = LanceArrowWriter.create(this.root, sparkSchema);
+    }
+    // Reset writer state from previous batch
+    writer.reset();
+    for (InternalRow record : records) {
+      writer.write(record);
+    }
+    // Finalize the writer (sets row count)
+    writer.finish();
+  }
+
+  /**
+   * Check if writer can accept more records based on file size.
+   * Uses filesystem-based size checking (similar to ORC/HFile approach).
+   *
+   * @return true if writer can accept more records, false if file size limit 
reached
+   */
+  public boolean canWrite() {
+    //TODO https://github.com/apache/hudi/issues/17684
+    return true;
+  }
+
+  @Override
+  protected Schema getArrowSchema() {
+    return arrowSchema;
+  }
+
+  /**
+   * Update Hudi metadata fields in the InternalRow.
+   *
+   * @param row InternalRow to update
+   * @param recordKey Record key
+   * @param partitionPath Partition path
+   * @param recordCount Current record count for sequence ID generation
+   */
+  protected void updateRecordMetadata(InternalRow row,
+                                      UTF8String recordKey,
+                                      String partitionPath,
+                                      long recordCount) {
+    row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), instantTime);
+    row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(), 
UTF8String.fromString(seqIdGenerator.apply(recordCount)));
+    row.update(RECORD_KEY_METADATA_FIELD.ordinal(), recordKey);
+    row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), 
UTF8String.fromString(partitionPath));
+    row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
+  }
+}
\ No newline at end of file
diff --git a/hudi-hadoop-common/pom.xml b/hudi-hadoop-common/pom.xml
index 55ddf796036f..53c56caf4ba4 100644
--- a/hudi-hadoop-common/pom.xml
+++ b/hudi-hadoop-common/pom.xml
@@ -158,5 +158,19 @@
       <version>1.17.2</version>
       <scope>test</scope>
     </dependency>
+    <!-- Lance Core SDK -->
+    <dependency>
+      <groupId>com.lancedb</groupId>
+      <artifactId>lance-core</artifactId>
+    </dependency>
+    <!-- Apache Arrow for Lance data representation -->
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-netty</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
new file mode 100644
index 000000000000..d4fad38e7a80
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.lance;
+
+import com.lancedb.lance.file.LanceFileWriter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for Hudi Lance file writers supporting different record types.
+ *
+ * This class handles common Lance file operations including:
+ * - LanceFileWriter lifecycle management
+ * - BufferAllocator management
+ * - Record buffering and batch flushing
+ * - File size checks
+ *
+ * Subclasses must implement type-specific conversion to Arrow format.
+ *
+ * @param <R> The record type (e.g., GenericRecord, InternalRow)
+ */
+@NotThreadSafe
+public abstract class HoodieBaseLanceWriter<R> implements Closeable {
+  protected static final int DEFAULT_BATCH_SIZE = 1000;
+  protected final HoodieStorage storage;
+  protected final StoragePath path;
+  protected final BufferAllocator allocator;
+  protected final List<R> bufferedRecords;
+  protected final int batchSize;
+  protected long writtenRecordCount = 0;
+  protected VectorSchemaRoot root;
+
+  private LanceFileWriter writer;
+
+  /**
+   * Constructor for base Lance writer.
+   *
+   * @param storage HoodieStorage instance
+   * @param path Path where Lance file will be written
+   * @param batchSize Number of records to buffer before flushing to Lance
+   */
+  protected HoodieBaseLanceWriter(HoodieStorage storage, StoragePath path, int 
batchSize) {
+    this.storage = storage;
+    this.path = path;
+    this.allocator = new RootAllocator(Long.MAX_VALUE);
+    this.bufferedRecords = new ArrayList<>(batchSize);
+    this.batchSize = batchSize;
+  }
+
+  /**
+   * Populate the VectorSchemaRoot with buffered records.
+   * Subclasses must implement type-specific conversion logic.
+   * The VectorSchemaRoot field is reused across batches and managed by this 
base class.
+   *
+   * @param records List of records to convert
+   */
+  protected abstract void populateVectorSchemaRoot(List<R> records);
+
+  /**
+   * Get the Arrow schema for this writer.
+   * Subclasses must provide the Arrow schema corresponding to their record 
type.
+   *
+   * @return Arrow schema
+   */
+  protected abstract Schema getArrowSchema();
+
+  /**
+   * Write a single record. Records are buffered and flushed in batches.
+   *
+   * @param record Record to write
+   * @throws IOException if write fails
+   */
+  public void write(R record) throws IOException {
+    bufferedRecords.add(record);
+    writtenRecordCount++;
+
+    if (bufferedRecords.size() >= batchSize) {
+      flushBatch();
+    }
+  }
+
+  /**
+   * Get the total number of records written so far.
+   *
+   * @return Number of records written
+   */
+  public long getWrittenRecordCount() {
+    return writtenRecordCount;
+  }
+
+  /**
+   * Close the writer, flushing any remaining buffered records.
+   *
+   * @throws IOException if close fails
+   */
+  @Override
+  public void close() throws IOException {
+    try {
+      // Flush any remaining buffered records
+      if (!bufferedRecords.isEmpty()) {
+        flushBatch();
+      }
+
+      // Ensure writer is initialized even if no data was written
+      // This creates an empty Lance file with just schema metadata
+      if (writer == null && root == null) {
+        initializeWriter();
+        root = VectorSchemaRoot.create(getArrowSchema(), allocator);
+        root.setRowCount(0);
+        writer.write(root);
+      }
+
+      // Close Lance writer
+      if (writer != null) {
+        writer.close();
+      }
+
+      // Close VectorSchemaRoot
+      if (root != null) {
+        root.close();
+      }
+    } catch (Exception e) {
+      throw new HoodieException("Failed to close Lance writer: " + path, e);
+    } finally {
+      // Always close allocator
+      allocator.close();
+    }
+  }
+
+  /**
+   * Flush buffered records to Lance file.
+   */
+  private void flushBatch() throws IOException {
+    if (bufferedRecords.isEmpty()) {
+      return;
+    }
+
+    // Lazy initialization of writer and root
+    if (writer == null) {
+      initializeWriter();
+    }
+    if (root == null) {
+      root = VectorSchemaRoot.create(getArrowSchema(), allocator);
+    }
+
+    // Reset root state for new batch
+    root.setRowCount(0);
+
+    // Populate root with records and write to Lance
+    populateVectorSchemaRoot(bufferedRecords);
+    writer.write(root);
+
+    // Clear buffer
+    bufferedRecords.clear();
+  }
+
+  /**
+   * Initialize LanceFileWriter (lazy initialization).
+   */
+  private void initializeWriter() throws IOException {
+    writer = LanceFileWriter.open(path.toString(), allocator, null);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
new file mode 100644
index 000000000000..d18833a825a7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceWriter}.
+ */
+@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
+public class TestHoodieSparkLanceWriter {
+
+  @TempDir
+  File tempDir;
+
+  private HoodieStorage storage;
+  private SparkTaskContextSupplier taskContextSupplier;
+  private String instantTime;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+    taskContextSupplier = new SparkTaskContextSupplier();
+    instantTime = "20251201120000000";
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (storage != null) {
+      storage.close();
+    }
+  }
+
+  @Test
+  public void testWriteRowWithMetadataPopulation() throws Exception {
+    // Schema WITH meta fields (writer expects this when 
populateMetaFields=true)
+    StructType schema = createSchemaWithMetaFields();
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_with_metadata.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, true)) {
+      // Write multiple records to test metadata population and sequence ID 
generation
+      for (int i = 0; i < 3; i++) {
+        InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
+        HoodieKey key = new HoodieKey("key" + i, "partition1");
+        writer.writeRowWithMetadata(key, row);
+      }
+    }
+
+    // Verify using LanceFileReader
+    assertTrue(storage.exists(path), "Lance file should exist");
+
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator);
+         ArrowReader arrowReader = reader.readAll(null, null, 
Integer.MAX_VALUE)) {
+
+      assertEquals(3, reader.numRows(), "Should have 3 records");
+
+      // Schema should have 5 meta fields + 3 user fields = 8 fields
+      assertEquals(8, reader.schema().getFields().size(), "Should have 8 
fields");
+
+      // Read and verify data
+      assertTrue(arrowReader.loadNextBatch(), "Should load batch");
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+      // Verify meta fields were populated for first record
+      VarCharVector commitTimeVector = (VarCharVector) 
root.getVector(COMMIT_TIME_METADATA_FIELD.getFieldName());
+      assertNotNull(commitTimeVector);
+      assertEquals(instantTime, new String(commitTimeVector.get(0)), "Commit 
time should match");
+
+      VarCharVector recordKeyVector = (VarCharVector) 
root.getVector(RECORD_KEY_METADATA_FIELD.getFieldName());
+      assertEquals("key0", new String(recordKeyVector.get(0)), "Record key 
should match");
+
+      VarCharVector partitionPathVector = (VarCharVector) 
root.getVector(PARTITION_PATH_METADATA_FIELD.getFieldName());
+      assertEquals("partition1", new String(partitionPathVector.get(0)), 
"Partition path should match");
+
+      VarCharVector fileNameVector = (VarCharVector) 
root.getVector(FILENAME_METADATA_FIELD.getFieldName());
+      assertEquals(path.getName(), new String(fileNameVector.get(0)), "File 
name should match");
+
+      // Verify sequence IDs are unique and properly formatted
+      VarCharVector seqNoVector = (VarCharVector) 
root.getVector(COMMIT_SEQNO_METADATA_FIELD.getFieldName());
+      List<String> seqNos = new ArrayList<>();
+      for (int i = 0; i < 3; i++) {
+        String seqNo = new String(seqNoVector.get(i));
+        assertTrue(seqNo.startsWith(instantTime + "_"), "Sequence number 
should start with instant time");
+        seqNos.add(seqNo);
+      }
+      assertEquals(3, seqNos.stream().distinct().count(), "All sequence IDs 
should be unique");
+
+      // Verify user data fields for first record
+      IntVector idVector = (IntVector) root.getVector("id");
+      assertEquals(0, idVector.get(0), "ID should match");
+
+      VarCharVector nameVector = (VarCharVector) root.getVector("name");
+      assertEquals("User0", new String(nameVector.get(0)), "Name should 
match");
+
+      BigIntVector ageVector = (BigIntVector) root.getVector("age");
+      assertEquals(20L, ageVector.get(0), "Age should match");
+    }
+  }
+
+  @Test
+  public void testWriteRowWithoutMetadataPopulation() throws Exception {
+    // Schema WITHOUT meta fields
+    StructType schema = createSchemaWithoutMetaFields();
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_without_metadata.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      // Create row with just user data (no meta fields)
+      InternalRow row = createRow(1, "Bob", 25L);
+      HoodieKey key = new HoodieKey("key2", "partition2");
+
+      writer.writeRowWithMetadata(key, row);
+    }
+
+    // Verify using LanceFileReader
+    assertTrue(storage.exists(path));
+
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator);
+         ArrowReader arrowReader = reader.readAll(null, null, 
Integer.MAX_VALUE)) {
+
+      assertEquals(1, reader.numRows());
+
+      // Schema should have ONLY 3 user fields (no meta fields)
+      assertEquals(3, reader.schema().getFields().size(), "Should have only 
user fields");
+
+      // Read and verify data
+      assertTrue(arrowReader.loadNextBatch());
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+      // Verify NO meta fields exist
+      assertFalse(hasField(root, COMMIT_TIME_METADATA_FIELD.getFieldName()), 
"Should not have commit time field");
+      assertFalse(hasField(root, RECORD_KEY_METADATA_FIELD.getFieldName()), 
"Should not have record key field");
+
+      // Verify user data
+      IntVector idVector = (IntVector) root.getVector("id");
+      assertEquals(1, idVector.get(0));
+
+      VarCharVector nameVector = (VarCharVector) root.getVector("name");
+      assertEquals("Bob", new String(nameVector.get(0)));
+    }
+  }
+
+  @Test
+  public void testWriteRowSimple() throws Exception {
+    StructType schema = createSchemaWithoutMetaFields();
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_simple_write.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      InternalRow row = createRow(1, "Charlie", 35L);
+      writer.writeRow("key3", row);
+    }
+
+    // Verify file exists and has correct record count
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
+      assertEquals(1, reader.numRows());
+    }
+  }
+
+  @Test
+  public void testBatchFlushing() throws Exception {
+    StructType schema = createSchemaWithoutMetaFields();
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_batch_flush.lance");
+    // Write more than DEFAULT_BATCH_SIZE (1000) records
+    int recordCount = 2500;
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      for (int i = 0; i < recordCount; i++) {
+        InternalRow row = createRow(i, "User" + i, 20L + i);
+        writer.writeRow("key" + i, row);
+      }
+    }
+
+    // Verify all records were written
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
+      assertEquals(recordCount, reader.numRows(), "All records should be 
written");
+    }
+  }
+
+  @Test
+  public void testPrimitiveTypes() throws Exception {
+    StructType schema = new StructType()
+        .add("int_field", DataTypes.IntegerType, false)
+        .add("long_field", DataTypes.LongType, false)
+        .add("float_field", DataTypes.FloatType, false)
+        .add("double_field", DataTypes.DoubleType, false)
+        .add("bool_field", DataTypes.BooleanType, false)
+        .add("string_field", DataTypes.StringType, false)
+        .add("binary_field", DataTypes.BinaryType, false);
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_primitives.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      GenericInternalRow row = new GenericInternalRow(new Object[]{
+          42,                                    // int
+          123456789L,                           // long
+          3.14f,                                // float
+          2.71828,                              // double
+          true,                                 // boolean
+          UTF8String.fromString("test"),        // string
+          new byte[]{1, 2, 3, 4}               // binary
+      });
+
+      writer.writeRow("key1", row);
+    }
+
+    // Verify all types were written correctly
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator);
+         ArrowReader arrowReader = reader.readAll(null, null, 
Integer.MAX_VALUE)) {
+
+      assertEquals(1, reader.numRows());
+      assertEquals(7, reader.schema().getFields().size());
+
+      arrowReader.loadNextBatch();
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+      assertEquals(42, ((IntVector) root.getVector("int_field")).get(0));
+      assertEquals(123456789L, ((BigIntVector) 
root.getVector("long_field")).get(0));
+      assertEquals(3.14f, ((Float4Vector) 
root.getVector("float_field")).get(0), 0.001);
+      assertEquals(2.71828, ((Float8Vector) 
root.getVector("double_field")).get(0), 0.00001);
+      assertEquals(1, ((BitVector) root.getVector("bool_field")).get(0));
+      assertEquals("test", new String(((VarCharVector) 
root.getVector("string_field")).get(0)));
+
+      byte[] binary = ((VarBinaryVector) 
root.getVector("binary_field")).get(0);
+      assertEquals(4, binary.length);
+    }
+  }
+
+  @Test
+  public void testNullValues() throws Exception {
+    StructType schema = new StructType()
+        .add("id", DataTypes.IntegerType, false)
+        .add("name", DataTypes.StringType, true)
+        .add("age", DataTypes.LongType, true);
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_nulls.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      // Write rows with null values
+      writer.writeRow("key1", createRow(1, "Alice", 30L));
+      writer.writeRow("key2", createRow(2, null, 25L));  // null name
+      writer.writeRow("key3", createRow(3, "Charlie", null));  // null age
+    }
+
+    // Verify nulls are preserved
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator);
+         ArrowReader arrowReader = reader.readAll(null, null, 
Integer.MAX_VALUE)) {
+
+      assertEquals(3, reader.numRows());
+
+      arrowReader.loadNextBatch();
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+      VarCharVector nameVector = (VarCharVector) root.getVector("name");
+      assertFalse(nameVector.isNull(0), "First name should not be null");
+      assertTrue(nameVector.isNull(1), "Second name should be null");
+      assertFalse(nameVector.isNull(2), "Third name should not be null");
+
+      BigIntVector ageVector = (BigIntVector) root.getVector("age");
+      assertFalse(ageVector.isNull(0), "First age should not be null");
+      assertFalse(ageVector.isNull(1), "Second age should not be null");
+      assertTrue(ageVector.isNull(2), "Third age should be null");
+    }
+  }
+
+  @Test
+  public void testWriteEmptyDataset() throws Exception {
+    StructType schema = new StructType()
+        .add("id", DataTypes.IntegerType, false);
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_empty.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      // Close without writing any rows
+    }
+
+    // Should create a empty lance file with just schema even if no data is 
written
+    assertTrue(storage.exists(path), "Lance file should exist even when no 
data is written");
+
+    // Verify the empty file has valid structure with correct schema
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
+      assertEquals(0, reader.numRows(), "Empty file should have 0 rows");
+      assertEquals(1, reader.schema().getFields().size(), "Should have 1 
field");
+      assertEquals("id", reader.schema().getFields().get(0).getName(), "Field 
name should be 'id'");
+    }
+  }
+
+  @Test
+  public void testWriteStructType() throws Exception {
+    // Create schema with nested struct
+    StructType addressSchema = new StructType()
+        .add("street", DataTypes.StringType, true)
+        .add("city", DataTypes.StringType, true)
+        .add("zipcode", DataTypes.IntegerType, true);
+
+    StructType schema = new StructType()
+        .add("id", DataTypes.IntegerType, false)
+        .add("name", DataTypes.StringType, true)
+        .add("address", addressSchema, true);
+
+    // Create test data with nested struct
+    GenericInternalRow address1 = new GenericInternalRow(new Object[]{
+        UTF8String.fromString("123 Main St"),
+        UTF8String.fromString("New York"),
+        10001
+    });
+
+    GenericInternalRow address2 = new GenericInternalRow(new Object[]{
+        UTF8String.fromString("456 Oak Ave"),
+        UTF8String.fromString("Los Angeles"),
+        90001
+    });
+
+    List<InternalRow> rows = new ArrayList<>();
+    rows.add(new GenericInternalRow(new Object[]{1, 
UTF8String.fromString("Alice"), address1}));
+    rows.add(new GenericInternalRow(new Object[]{2, 
UTF8String.fromString("Bob"), address2}));
+
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_struct.lance");
+    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+        path, schema, instantTime, taskContextSupplier, storage, false)) {
+      for (InternalRow row : rows) {
+        writer.writeRow("key" + rows.indexOf(row), row);
+      }
+    }
+
+    assertTrue(storage.exists(path), "Lance file with struct type should 
exist");
+    try (BufferAllocator allocator = new RootAllocator();
+         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
+      assertEquals(rows.size(), reader.numRows(), "Row count should match");
+      assertEquals(3, reader.schema().getFields().size(), "Should have 3 
top-level fields (id, name, address)");
+    }
+  }
+
+  // Helper methods
+
+  private StructType createSchemaWithMetaFields() {
+    return new StructType()
+        .add(COMMIT_TIME_METADATA_FIELD.getFieldName(), DataTypes.StringType, 
false)
+        .add(COMMIT_SEQNO_METADATA_FIELD.getFieldName(), DataTypes.StringType, 
false)
+        .add(RECORD_KEY_METADATA_FIELD.getFieldName(), DataTypes.StringType, 
false)
+        .add(PARTITION_PATH_METADATA_FIELD.getFieldName(), 
DataTypes.StringType, false)
+        .add(FILENAME_METADATA_FIELD.getFieldName(), DataTypes.StringType, 
false)
+        .add("id", DataTypes.IntegerType, false)
+        .add("name", DataTypes.StringType, true)
+        .add("age", DataTypes.LongType, true);
+  }
+
+  private StructType createSchemaWithoutMetaFields() {
+    return new StructType()
+        .add("id", DataTypes.IntegerType, false)
+        .add("name", DataTypes.StringType, true)
+        .add("age", DataTypes.LongType, true);
+  }
+
+  private InternalRow createRowWithMetaFields(Object... userValues) {
+    // Create row with PLACEHOLDER meta fields (will be updated by writer) + 
user data
+    Object[] allValues = new Object[5 + userValues.length];
+
+    // Meta fields - use empty strings as placeholders
+    allValues[0] = UTF8String.fromString(""); // commit_time
+    allValues[1] = UTF8String.fromString(""); // commit_seqno
+    allValues[2] = UTF8String.fromString(""); // record_key
+    allValues[3] = UTF8String.fromString(""); // partition_path
+    allValues[4] = UTF8String.fromString(""); // file_name
+
+    // Copy user values
+    for (int i = 0; i < userValues.length; i++) {
+      allValues[5 + i] = processValue(userValues[i]);
+    }
+
+    return new GenericInternalRow(allValues);
+  }
+
+  private InternalRow createRow(Object... values) {
+    Object[] processedValues = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      processedValues[i] = processValue(values[i]);
+    }
+    return new GenericInternalRow(processedValues);
+  }
+
+  private Object processValue(Object value) {
+    if (value instanceof String) {
+      return UTF8String.fromString((String) value);
+    }
+    return value;
+  }
+
+  private boolean hasField(VectorSchemaRoot root, String fieldName) {
+    try {
+      return root.getVector(fieldName) != null;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 873c85ee1258..cb659f4f38cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,6 +240,11 @@
     <springboot.version>2.7.10</springboot.version>
     <spring.shell.version>2.1.1</spring.shell.version>
     <snappy.version>1.1.10.7</snappy.version>
+    <arrow.version>18.3.0</arrow.version>
+    <lance.version>0.39.0</lance.version>
+    <lance.spark.connector.version>0.0.15</lance.spark.connector.version>
+    
<lance.spark.artifact>lance-spark-3.5_${scala.binary.version}</lance.spark.artifact>
+    <lance.skip.tests>false</lance.skip.tests>
     <!-- The following properties are only used for Jacoco coverage report 
aggregation -->
     <copy.files>false</copy.files>
     
<copy.files.target.dir>${maven.multiModuleProjectDirectory}</copy.files.target.dir>
@@ -922,7 +927,29 @@
         <version>${orc.spark.version}</version>
         <scope>compile</scope>
       </dependency>
-
+      <!-- Lance Core SDK -->
+      <dependency>
+        <groupId>com.lancedb</groupId>
+        <artifactId>lance-core</artifactId>
+        <version>${lance.version}</version>
+      </dependency>
+      <!-- Lance Spark for Arrow conversion -->
+      <dependency>
+        <groupId>com.lancedb</groupId>
+        <artifactId>${lance.spark.artifact}</artifactId>
+        <version>${lance.spark.connector.version}</version>
+      </dependency>
+      <!-- Apache Arrow -->
+      <dependency>
+        <groupId>org.apache.arrow</groupId>
+        <artifactId>arrow-vector</artifactId>
+        <version>${arrow.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.arrow</groupId>
+        <artifactId>arrow-memory-netty</artifactId>
+        <version>${arrow.version}</version>
+      </dependency>
       <!-- RoaringBitmap -->
       <dependency>
         <groupId>org.roaringbitmap</groupId>
@@ -2489,6 +2516,9 @@
         <!-- This glob has to include hudi-spark3-common, 
hudi-spark3.2plus-common -->
         <hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
         <kafka.version>2.8.1</kafka.version>
+        <!-- Lance: Skip tests for Spark 3.3 due to lack of support -->
+        
<lance.spark.artifact>lance-spark-base_${scala.binary.version}</lance.spark.artifact>
+        <lance.skip.tests>true</lance.skip.tests>
         <!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc 
file-format dependency (hudi-hive-sync,
                    hudi-hadoop-mr, for ex). Since these Hudi modules might be 
used from w/in the execution engine(s)
                    bringing these file-formats as dependencies as well, we 
need to make sure that versions are
@@ -2530,6 +2560,9 @@
         <hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
         <scalatest.version>${scalatest.spark3.version}</scalatest.version>
         <kafka.version>3.3.2</kafka.version>
+        <!-- Lance: Use Spark 3.4-specific artifact -->
+        
<lance.spark.artifact>lance-spark-3.4_${scala.binary.version}</lance.spark.artifact>
+        <lance.skip.tests>false</lance.skip.tests>
         <!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc 
file-format dependency (hudi-hive-sync,
                    hudi-hadoop-mr, for ex). Since these Hudi modules might be 
used from w/in the execution engine(s)
                    bringing these file-formats as dependencies as well, we 
need to make sure that versions are
@@ -2581,6 +2614,9 @@
         <scalatest.version>${scalatest.spark3.version}</scalatest.version>
         <kafka.version>3.4.1</kafka.version>
         <hive.storage.version>2.8.1</hive.storage.version>
+        <!-- Lance: Use Spark 3.5-specific artifact -->
+        
<lance.spark.artifact>lance-spark-3.5_${scala.binary.version}</lance.spark.artifact>
+        <lance.skip.tests>false</lance.skip.tests>
         <!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc 
file-format dependency (hudi-hive-sync,
                    hudi-hadoop-mr, for ex). Since these Hudi modules might be 
used from w/in the execution engine(s)
                    bringing these file-formats as dependencies as well, we 
need to make sure that versions are
@@ -2640,6 +2676,9 @@
         <hadoop.version>3.4.0</hadoop.version>
         <kafka.version>3.8.0</kafka.version>
         <hive.storage.version>2.8.1</hive.storage.version>
+        <!-- Lance: Use Spark 4.0-specific artifact (Scala 2.13 only) -->
+        <lance.spark.artifact>lance-spark-4.0_2.13</lance.spark.artifact>
+        <lance.skip.tests>false</lance.skip.tests>
         <!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc 
file-format dependency (hudi-hive-sync,
                    hudi-hadoop-mr, for ex). Since these Hudi modules might be 
used from w/in the execution engine(s)
                    bringing these file-formats as dependencies as well, we 
need to make sure that versions are


Reply via email to