This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 117f8da56fd8 feat: Add HoodieBaseLanceFileWriter and implementation
for SparkFileWriter (#17629)
117f8da56fd8 is described below
commit 117f8da56fd8b3b135acd2fc5faf00563eff8f9c
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 23 13:36:00 2025 -0500
feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter
(#17629)
---
hudi-client/hudi-spark-client/pom.xml | 6 +
.../hudi/io/storage/HoodieSparkLanceWriter.java | 157 +++++++
hudi-hadoop-common/pom.xml | 14 +
.../hudi/io/lance/HoodieBaseLanceWriter.java | 191 +++++++++
.../io/storage/TestHoodieSparkLanceWriter.java | 462 +++++++++++++++++++++
pom.xml | 41 +-
6 files changed, 870 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-spark-client/pom.xml
b/hudi-client/hudi-spark-client/pom.xml
index 473d2a718e94..9b557f85b137 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -100,6 +100,12 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <!-- Lance Spark for Arrow conversion -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>${lance.spark.artifact}</artifactId>
+ </dependency>
+
<!-- Used for adding kryo serializers for protobuf -->
<dependency>
<groupId>com.twitter</groupId>
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
new file mode 100644
index 000000000000..e7476f4473fa
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.spark.arrow.LanceArrowWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.LanceArrowUtils;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+
+/**
+ * Spark Lance file writer implementing {@link HoodieSparkFileWriter}.
+ *
+ * This writer integrates with Hudi's storage I/O layer and supports:
+ * - Hudi metadata field population
+ * - Record key tracking (for bloom filters - TODO
https://github.com/apache/hudi/issues/17664)
+ * - Sequence ID generation
+ */
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
implements HoodieSparkFileWriter {
+
+ private static final String DEFAULT_TIMEZONE = "UTC";
+
+ private final StructType sparkSchema;
+ private final Schema arrowSchema;
+ private final UTF8String fileName;
+ private final UTF8String instantTime;
+ private final boolean populateMetaFields;
+ private final Function<Long, String> seqIdGenerator;
+ private LanceArrowWriter writer;
+
+ /**
+ * Constructor for Spark Lance writer.
+ *
+ * @param file Path where Lance file will be written
+ * @param sparkSchema Spark schema for the data
+ * @param instantTime Instant time for the commit
+ * @param taskContextSupplier Task context supplier for partition ID
+ * @param storage HoodieStorage instance
+ * @param populateMetaFields Whether to populate Hudi metadata fields
+ * @throws IOException if writer initialization fails
+ */
+ public HoodieSparkLanceWriter(StoragePath file,
+ StructType sparkSchema,
+ String instantTime,
+ TaskContextSupplier taskContextSupplier,
+ HoodieStorage storage,
+ boolean populateMetaFields) throws IOException
{
+ super(storage, file, DEFAULT_BATCH_SIZE);
+ this.sparkSchema = sparkSchema;
+ this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema,
DEFAULT_TIMEZONE, true, false);
+ this.fileName = UTF8String.fromString(file.getName());
+ this.instantTime = UTF8String.fromString(instantTime);
+ this.populateMetaFields = populateMetaFields;
+ this.seqIdGenerator = recordIndex -> {
+ Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
+ return HoodieRecord.generateSequenceId(instantTime, partitionId,
recordIndex);
+ };
+ }
+
+ @Override
+ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
+ if (populateMetaFields) {
+ UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+ updateRecordMetadata(row, recordKey, key.getPartitionPath(),
getWrittenRecordCount());
+ super.write(row);
+ } else {
+ super.write(row);
+ }
+ }
+
+ @Override
+ public void writeRow(String recordKey, InternalRow row) throws IOException {
+ super.write(row);
+ }
+
+ @Override
+ protected void populateVectorSchemaRoot(List<InternalRow> records) {
+ if (writer == null) {
+ writer = LanceArrowWriter.create(this.root, sparkSchema);
+ }
+ // Reset writer state from previous batch
+ writer.reset();
+ for (InternalRow record : records) {
+ writer.write(record);
+ }
+ // Finalize the writer (sets row count)
+ writer.finish();
+ }
+
+ /**
+ * Check if writer can accept more records based on file size.
+ * Uses filesystem-based size checking (similar to ORC/HFile approach).
+ *
+ * @return true if writer can accept more records, false if file size limit
reached
+ */
+ public boolean canWrite() {
+ //TODO https://github.com/apache/hudi/issues/17684
+ return true;
+ }
+
+ @Override
+ protected Schema getArrowSchema() {
+ return arrowSchema;
+ }
+
+ /**
+ * Update Hudi metadata fields in the InternalRow.
+ *
+ * @param row InternalRow to update
+ * @param recordKey Record key
+ * @param partitionPath Partition path
+ * @param recordCount Current record count for sequence ID generation
+ */
+ protected void updateRecordMetadata(InternalRow row,
+ UTF8String recordKey,
+ String partitionPath,
+ long recordCount) {
+ row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), instantTime);
+ row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(),
UTF8String.fromString(seqIdGenerator.apply(recordCount)));
+ row.update(RECORD_KEY_METADATA_FIELD.ordinal(), recordKey);
+ row.update(PARTITION_PATH_METADATA_FIELD.ordinal(),
UTF8String.fromString(partitionPath));
+ row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
+ }
+}
\ No newline at end of file
diff --git a/hudi-hadoop-common/pom.xml b/hudi-hadoop-common/pom.xml
index 55ddf796036f..53c56caf4ba4 100644
--- a/hudi-hadoop-common/pom.xml
+++ b/hudi-hadoop-common/pom.xml
@@ -158,5 +158,19 @@
<version>1.17.2</version>
<scope>test</scope>
</dependency>
+ <!-- Lance Core SDK -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>lance-core</artifactId>
+ </dependency>
+ <!-- Apache Arrow for Lance data representation -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
new file mode 100644
index 000000000000..d4fad38e7a80
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.lance;
+
+import com.lancedb.lance.file.LanceFileWriter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for Hudi Lance file writers supporting different record types.
+ *
+ * This class handles common Lance file operations including:
+ * - LanceFileWriter lifecycle management
+ * - BufferAllocator management
+ * - Record buffering and batch flushing
+ * - File size checks
+ *
+ * Subclasses must implement type-specific conversion to Arrow format.
+ *
+ * @param <R> The record type (e.g., GenericRecord, InternalRow)
+ */
+@NotThreadSafe
+public abstract class HoodieBaseLanceWriter<R> implements Closeable {
+ protected static final int DEFAULT_BATCH_SIZE = 1000;
+ protected final HoodieStorage storage;
+ protected final StoragePath path;
+ protected final BufferAllocator allocator;
+ protected final List<R> bufferedRecords;
+ protected final int batchSize;
+ protected long writtenRecordCount = 0;
+ protected VectorSchemaRoot root;
+
+ private LanceFileWriter writer;
+
+ /**
+ * Constructor for base Lance writer.
+ *
+ * @param storage HoodieStorage instance
+ * @param path Path where Lance file will be written
+ * @param batchSize Number of records to buffer before flushing to Lance
+ */
+ protected HoodieBaseLanceWriter(HoodieStorage storage, StoragePath path, int
batchSize) {
+ this.storage = storage;
+ this.path = path;
+ this.allocator = new RootAllocator(Long.MAX_VALUE);
+ this.bufferedRecords = new ArrayList<>(batchSize);
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Populate the VectorSchemaRoot with buffered records.
+ * Subclasses must implement type-specific conversion logic.
+ * The VectorSchemaRoot field is reused across batches and managed by this
base class.
+ *
+ * @param records List of records to convert
+ */
+ protected abstract void populateVectorSchemaRoot(List<R> records);
+
+ /**
+ * Get the Arrow schema for this writer.
+ * Subclasses must provide the Arrow schema corresponding to their record
type.
+ *
+ * @return Arrow schema
+ */
+ protected abstract Schema getArrowSchema();
+
+ /**
+ * Write a single record. Records are buffered and flushed in batches.
+ *
+ * @param record Record to write
+ * @throws IOException if write fails
+ */
+ public void write(R record) throws IOException {
+ bufferedRecords.add(record);
+ writtenRecordCount++;
+
+ if (bufferedRecords.size() >= batchSize) {
+ flushBatch();
+ }
+ }
+
+ /**
+ * Get the total number of records written so far.
+ *
+ * @return Number of records written
+ */
+ public long getWrittenRecordCount() {
+ return writtenRecordCount;
+ }
+
+ /**
+ * Close the writer, flushing any remaining buffered records.
+ *
+ * @throws IOException if close fails
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ // Flush any remaining buffered records
+ if (!bufferedRecords.isEmpty()) {
+ flushBatch();
+ }
+
+ // Ensure writer is initialized even if no data was written
+ // This creates an empty Lance file with just schema metadata
+ if (writer == null && root == null) {
+ initializeWriter();
+ root = VectorSchemaRoot.create(getArrowSchema(), allocator);
+ root.setRowCount(0);
+ writer.write(root);
+ }
+
+ // Close Lance writer
+ if (writer != null) {
+ writer.close();
+ }
+
+ // Close VectorSchemaRoot
+ if (root != null) {
+ root.close();
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close Lance writer: " + path, e);
+ } finally {
+ // Always close allocator
+ allocator.close();
+ }
+ }
+
+ /**
+ * Flush buffered records to Lance file.
+ */
+ private void flushBatch() throws IOException {
+ if (bufferedRecords.isEmpty()) {
+ return;
+ }
+
+ // Lazy initialization of writer and root
+ if (writer == null) {
+ initializeWriter();
+ }
+ if (root == null) {
+ root = VectorSchemaRoot.create(getArrowSchema(), allocator);
+ }
+
+ // Reset root state for new batch
+ root.setRowCount(0);
+
+ // Populate root with records and write to Lance
+ populateVectorSchemaRoot(bufferedRecords);
+ writer.write(root);
+
+ // Clear buffer
+ bufferedRecords.clear();
+ }
+
+ /**
+ * Initialize LanceFileWriter (lazy initialization).
+ */
+ private void initializeWriter() throws IOException {
+ writer = LanceFileWriter.open(path.toString(), allocator, null);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
new file mode 100644
index 000000000000..d18833a825a7
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceWriter}.
+ */
+@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
+public class TestHoodieSparkLanceWriter {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testWriteRowWithMetadataPopulation() throws Exception {
+ // Schema WITH meta fields (writer expects this when
populateMetaFields=true)
+ StructType schema = createSchemaWithMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_with_metadata.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, true)) {
+ // Write multiple records to test metadata population and sequence ID
generation
+ for (int i = 0; i < 3; i++) {
+ InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
+ HoodieKey key = new HoodieKey("key" + i, "partition1");
+ writer.writeRowWithMetadata(key, row);
+ }
+ }
+
+ // Verify using LanceFileReader
+ assertTrue(storage.exists(path), "Lance file should exist");
+
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator);
+ ArrowReader arrowReader = reader.readAll(null, null,
Integer.MAX_VALUE)) {
+
+ assertEquals(3, reader.numRows(), "Should have 3 records");
+
+ // Schema should have 5 meta fields + 3 user fields = 8 fields
+ assertEquals(8, reader.schema().getFields().size(), "Should have 8
fields");
+
+ // Read and verify data
+ assertTrue(arrowReader.loadNextBatch(), "Should load batch");
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Verify meta fields were populated for first record
+ VarCharVector commitTimeVector = (VarCharVector)
root.getVector(COMMIT_TIME_METADATA_FIELD.getFieldName());
+ assertNotNull(commitTimeVector);
+ assertEquals(instantTime, new String(commitTimeVector.get(0)), "Commit
time should match");
+
+ VarCharVector recordKeyVector = (VarCharVector)
root.getVector(RECORD_KEY_METADATA_FIELD.getFieldName());
+ assertEquals("key0", new String(recordKeyVector.get(0)), "Record key
should match");
+
+ VarCharVector partitionPathVector = (VarCharVector)
root.getVector(PARTITION_PATH_METADATA_FIELD.getFieldName());
+ assertEquals("partition1", new String(partitionPathVector.get(0)),
"Partition path should match");
+
+ VarCharVector fileNameVector = (VarCharVector)
root.getVector(FILENAME_METADATA_FIELD.getFieldName());
+ assertEquals(path.getName(), new String(fileNameVector.get(0)), "File
name should match");
+
+ // Verify sequence IDs are unique and properly formatted
+ VarCharVector seqNoVector = (VarCharVector)
root.getVector(COMMIT_SEQNO_METADATA_FIELD.getFieldName());
+ List<String> seqNos = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String seqNo = new String(seqNoVector.get(i));
+ assertTrue(seqNo.startsWith(instantTime + "_"), "Sequence number
should start with instant time");
+ seqNos.add(seqNo);
+ }
+ assertEquals(3, seqNos.stream().distinct().count(), "All sequence IDs
should be unique");
+
+ // Verify user data fields for first record
+ IntVector idVector = (IntVector) root.getVector("id");
+ assertEquals(0, idVector.get(0), "ID should match");
+
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertEquals("User0", new String(nameVector.get(0)), "Name should
match");
+
+ BigIntVector ageVector = (BigIntVector) root.getVector("age");
+ assertEquals(20L, ageVector.get(0), "Age should match");
+ }
+ }
+
+ @Test
+ public void testWriteRowWithoutMetadataPopulation() throws Exception {
+ // Schema WITHOUT meta fields
+ StructType schema = createSchemaWithoutMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_without_metadata.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ // Create row with just user data (no meta fields)
+ InternalRow row = createRow(1, "Bob", 25L);
+ HoodieKey key = new HoodieKey("key2", "partition2");
+
+ writer.writeRowWithMetadata(key, row);
+ }
+
+ // Verify using LanceFileReader
+ assertTrue(storage.exists(path));
+
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator);
+ ArrowReader arrowReader = reader.readAll(null, null,
Integer.MAX_VALUE)) {
+
+ assertEquals(1, reader.numRows());
+
+ // Schema should have ONLY 3 user fields (no meta fields)
+ assertEquals(3, reader.schema().getFields().size(), "Should have only
user fields");
+
+ // Read and verify data
+ assertTrue(arrowReader.loadNextBatch());
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Verify NO meta fields exist
+ assertFalse(hasField(root, COMMIT_TIME_METADATA_FIELD.getFieldName()),
"Should not have commit time field");
+ assertFalse(hasField(root, RECORD_KEY_METADATA_FIELD.getFieldName()),
"Should not have record key field");
+
+ // Verify user data
+ IntVector idVector = (IntVector) root.getVector("id");
+ assertEquals(1, idVector.get(0));
+
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertEquals("Bob", new String(nameVector.get(0)));
+ }
+ }
+
+ @Test
+ public void testWriteRowSimple() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_simple_write.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ InternalRow row = createRow(1, "Charlie", 35L);
+ writer.writeRow("key3", row);
+ }
+
+ // Verify file exists and has correct record count
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(1, reader.numRows());
+ }
+ }
+
+ @Test
+ public void testBatchFlushing() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_batch_flush.lance");
+ // Write more than DEFAULT_BATCH_SIZE (1000) records
+ int recordCount = 2500;
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ for (int i = 0; i < recordCount; i++) {
+ InternalRow row = createRow(i, "User" + i, 20L + i);
+ writer.writeRow("key" + i, row);
+ }
+ }
+
+ // Verify all records were written
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(recordCount, reader.numRows(), "All records should be
written");
+ }
+ }
+
+ @Test
+ public void testPrimitiveTypes() throws Exception {
+ StructType schema = new StructType()
+ .add("int_field", DataTypes.IntegerType, false)
+ .add("long_field", DataTypes.LongType, false)
+ .add("float_field", DataTypes.FloatType, false)
+ .add("double_field", DataTypes.DoubleType, false)
+ .add("bool_field", DataTypes.BooleanType, false)
+ .add("string_field", DataTypes.StringType, false)
+ .add("binary_field", DataTypes.BinaryType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ GenericInternalRow row = new GenericInternalRow(new Object[]{
+ 42, // int
+ 123456789L, // long
+ 3.14f, // float
+ 2.71828, // double
+ true, // boolean
+ UTF8String.fromString("test"), // string
+ new byte[]{1, 2, 3, 4} // binary
+ });
+
+ writer.writeRow("key1", row);
+ }
+
+ // Verify all types were written correctly
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator);
+ ArrowReader arrowReader = reader.readAll(null, null,
Integer.MAX_VALUE)) {
+
+ assertEquals(1, reader.numRows());
+ assertEquals(7, reader.schema().getFields().size());
+
+ arrowReader.loadNextBatch();
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ assertEquals(42, ((IntVector) root.getVector("int_field")).get(0));
+ assertEquals(123456789L, ((BigIntVector)
root.getVector("long_field")).get(0));
+ assertEquals(3.14f, ((Float4Vector)
root.getVector("float_field")).get(0), 0.001);
+ assertEquals(2.71828, ((Float8Vector)
root.getVector("double_field")).get(0), 0.00001);
+ assertEquals(1, ((BitVector) root.getVector("bool_field")).get(0));
+ assertEquals("test", new String(((VarCharVector)
root.getVector("string_field")).get(0)));
+
+ byte[] binary = ((VarBinaryVector)
root.getVector("binary_field")).get(0);
+ assertEquals(4, binary.length);
+ }
+ }
+
+ @Test
+ public void testNullValues() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ // Write rows with null values
+ writer.writeRow("key1", createRow(1, "Alice", 30L));
+ writer.writeRow("key2", createRow(2, null, 25L)); // null name
+ writer.writeRow("key3", createRow(3, "Charlie", null)); // null age
+ }
+
+ // Verify nulls are preserved
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator);
+ ArrowReader arrowReader = reader.readAll(null, null,
Integer.MAX_VALUE)) {
+
+ assertEquals(3, reader.numRows());
+
+ arrowReader.loadNextBatch();
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertFalse(nameVector.isNull(0), "First name should not be null");
+ assertTrue(nameVector.isNull(1), "Second name should be null");
+ assertFalse(nameVector.isNull(2), "Third name should not be null");
+
+ BigIntVector ageVector = (BigIntVector) root.getVector("age");
+ assertFalse(ageVector.isNull(0), "First age should not be null");
+ assertFalse(ageVector.isNull(1), "Second age should not be null");
+ assertTrue(ageVector.isNull(2), "Third age should be null");
+ }
+ }
+
+ @Test
+ public void testWriteEmptyDataset() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_empty.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ // Close without writing any rows
+ }
+
+ // Should create a empty lance file with just schema even if no data is
written
+ assertTrue(storage.exists(path), "Lance file should exist even when no
data is written");
+
+ // Verify the empty file has valid structure with correct schema
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(0, reader.numRows(), "Empty file should have 0 rows");
+ assertEquals(1, reader.schema().getFields().size(), "Should have 1
field");
+ assertEquals("id", reader.schema().getFields().get(0).getName(), "Field
name should be 'id'");
+ }
+ }
+
+ @Test
+ public void testWriteStructType() throws Exception {
+ // Create schema with nested struct
+ StructType addressSchema = new StructType()
+ .add("street", DataTypes.StringType, true)
+ .add("city", DataTypes.StringType, true)
+ .add("zipcode", DataTypes.IntegerType, true);
+
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("address", addressSchema, true);
+
+ // Create test data with nested struct
+ GenericInternalRow address1 = new GenericInternalRow(new Object[]{
+ UTF8String.fromString("123 Main St"),
+ UTF8String.fromString("New York"),
+ 10001
+ });
+
+ GenericInternalRow address2 = new GenericInternalRow(new Object[]{
+ UTF8String.fromString("456 Oak Ave"),
+ UTF8String.fromString("Los Angeles"),
+ 90001
+ });
+
+ List<InternalRow> rows = new ArrayList<>();
+ rows.add(new GenericInternalRow(new Object[]{1,
UTF8String.fromString("Alice"), address1}));
+ rows.add(new GenericInternalRow(new Object[]{2,
UTF8String.fromString("Bob"), address2}));
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_struct.lance");
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ for (InternalRow row : rows) {
+ writer.writeRow("key" + rows.indexOf(row), row);
+ }
+ }
+
+ assertTrue(storage.exists(path), "Lance file with struct type should
exist");
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(rows.size(), reader.numRows(), "Row count should match");
+ assertEquals(3, reader.schema().getFields().size(), "Should have 3
top-level fields (id, name, address)");
+ }
+ }
+
+ // Helper methods
+
+ private StructType createSchemaWithMetaFields() {
+ return new StructType()
+ .add(COMMIT_TIME_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add(COMMIT_SEQNO_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add(RECORD_KEY_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add(PARTITION_PATH_METADATA_FIELD.getFieldName(),
DataTypes.StringType, false)
+ .add(FILENAME_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true);
+ }
+
+ private StructType createSchemaWithoutMetaFields() {
+ return new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true);
+ }
+
+ private InternalRow createRowWithMetaFields(Object... userValues) {
+ // Create row with PLACEHOLDER meta fields (will be updated by writer) +
user data
+ Object[] allValues = new Object[5 + userValues.length];
+
+ // Meta fields - use empty strings as placeholders
+ allValues[0] = UTF8String.fromString(""); // commit_time
+ allValues[1] = UTF8String.fromString(""); // commit_seqno
+ allValues[2] = UTF8String.fromString(""); // record_key
+ allValues[3] = UTF8String.fromString(""); // partition_path
+ allValues[4] = UTF8String.fromString(""); // file_name
+
+ // Copy user values
+ for (int i = 0; i < userValues.length; i++) {
+ allValues[5 + i] = processValue(userValues[i]);
+ }
+
+ return new GenericInternalRow(allValues);
+ }
+
+ private InternalRow createRow(Object... values) {
+ Object[] processedValues = new Object[values.length];
+ for (int i = 0; i < values.length; i++) {
+ processedValues[i] = processValue(values[i]);
+ }
+ return new GenericInternalRow(processedValues);
+ }
+
+ private Object processValue(Object value) {
+ if (value instanceof String) {
+ return UTF8String.fromString((String) value);
+ }
+ return value;
+ }
+
+ private boolean hasField(VectorSchemaRoot root, String fieldName) {
+ try {
+ return root.getVector(fieldName) != null;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 873c85ee1258..cb659f4f38cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,6 +240,11 @@
<springboot.version>2.7.10</springboot.version>
<spring.shell.version>2.1.1</spring.shell.version>
<snappy.version>1.1.10.7</snappy.version>
+ <arrow.version>18.3.0</arrow.version>
+ <lance.version>0.39.0</lance.version>
+ <lance.spark.connector.version>0.0.15</lance.spark.connector.version>
+
<lance.spark.artifact>lance-spark-3.5_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- The following properties are only used for Jacoco coverage report
aggregation -->
<copy.files>false</copy.files>
<copy.files.target.dir>${maven.multiModuleProjectDirectory}</copy.files.target.dir>
@@ -922,7 +927,29 @@
<version>${orc.spark.version}</version>
<scope>compile</scope>
</dependency>
-
+ <!-- Lance Core SDK -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>lance-core</artifactId>
+ <version>${lance.version}</version>
+ </dependency>
+ <!-- Lance Spark for Arrow conversion -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>${lance.spark.artifact}</artifactId>
+ <version>${lance.spark.connector.version}</version>
+ </dependency>
+ <!-- Apache Arrow -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
<!-- RoaringBitmap -->
<dependency>
<groupId>org.roaringbitmap</groupId>
@@ -2489,6 +2516,9 @@
<!-- This glob has to include hudi-spark3-common,
hudi-spark3.2plus-common -->
<hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
<kafka.version>2.8.1</kafka.version>
+ <!-- Lance: Skip tests for Spark 3.3 due to lack of support -->
+
<lance.spark.artifact>lance-spark-base_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>true</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are
@@ -2530,6 +2560,9 @@
<hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
<scalatest.version>${scalatest.spark3.version}</scalatest.version>
<kafka.version>3.3.2</kafka.version>
+ <!-- Lance: Use Spark 3.4-specific artifact -->
+
<lance.spark.artifact>lance-spark-3.4_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are
@@ -2581,6 +2614,9 @@
<scalatest.version>${scalatest.spark3.version}</scalatest.version>
<kafka.version>3.4.1</kafka.version>
<hive.storage.version>2.8.1</hive.storage.version>
+ <!-- Lance: Use Spark 3.5-specific artifact -->
+
<lance.spark.artifact>lance-spark-3.5_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are
@@ -2640,6 +2676,9 @@
<hadoop.version>3.4.0</hadoop.version>
<kafka.version>3.8.0</kafka.version>
<hive.storage.version>2.8.1</hive.storage.version>
+ <!-- Lance: Use Spark 4.0-specific artifact (Scala 2.13 only) -->
+ <lance.spark.artifact>lance-spark-4.0_2.13</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are