the-other-tim-brown commented on code in PR #17632:
URL: https://github.com/apache/hudi/pull/17632#discussion_r2644067177
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java:
##########
@@ -123,6 +127,9 @@ public long getWrittenRecordCount() {
*/
@Override
public void close() throws IOException {
+ Exception primaryException = null;
Review Comment:
This variable is unused, let's remove it
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.HoodieSchemaConversionUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.util.HoodieArrowAllocator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.LanceArrowUtils;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+/**
+ * {@link HoodieSparkFileReader} implementation for Lance file format.
+ */
+public class HoodieSparkLanceReader implements HoodieSparkFileReader {
+ // Memory size for data read operations: 120MB
+ public static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024;
+
+ // Memory size for metadata operations: 8MB
+ private static final long LANCE_METADATA_ALLOCATOR_SIZE = 8 * 1024 * 1024;
+
+ // number of rows to read
+ private static final int DEFAULT_BATCH_SIZE = 512;
+ private final StoragePath path;
+
+ public HoodieSparkLanceReader(StoragePath path) {
+ this.path = path;
+ }
+
+ @Override
+ public String[] readMinMaxRecordKeys() {
+ throw new UnsupportedOperationException("Min/max record key tracking is
not yet supported for Lance file format");
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ throw new UnsupportedOperationException("Bloom filter is not yet supported
for Lance file format");
+ }
+
+ @Override
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+ Set<Pair<String, Long>> result = new HashSet<>();
+ long position = 0;
+
+ try (ClosableIterator<String> keyIterator = getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ String recordKey = keyIterator.next();
+ // If filter is empty/null, then all keys will be added.
+ // if filter has specific keys, then ensure only those are added
+ if (candidateRowKeys == null || candidateRowKeys.isEmpty()
+ || candidateRowKeys.contains(recordKey)) {
+ result.add(Pair.of(recordKey, position));
+ }
+ position++;
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to filter row keys from Lance file:
" + path, e);
+ }
+
+ return result;
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
Review Comment:
Let's make sure there is at least one test with the requested schema not
matching the full schema
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
+import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceReader}.
+ */
+public class TestHoodieSparkLanceReader {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testReadPrimitiveTypes() throws Exception {
+ // Create schema with primitive types
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true)
+ .add("score", DataTypes.DoubleType, true)
+ .add("active", DataTypes.BooleanType, true);
+
+ // Create test data
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 30L, 95.5, true));
+ expectedRows.add(createRow(2, "Bob", 25L, 87.3, false));
+ expectedRows.add(createRow(3, "Charlie", 35L, 92.1, true));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ // Verify record count
+ assertEquals(expectedRows.size(), reader.getTotalRecords(), "Record
count should match");
+
+ // Verify schema
+ assertNotNull(reader.getSchema(), "Schema should not be null");
+
+ // Read all records
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ // Verify record count
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify each record
+ for (int i = 0; i < expectedRows.size(); i++) {
+ InternalRow expected = expectedRows.get(i);
+ InternalRow actual = actualRows.get(i);
+
+ assertEquals(expected.getInt(0), actual.getInt(0), "id field should
match");
+ assertEquals(expected.getUTF8String(1), actual.getUTF8String(1), "name
field should match");
+ assertEquals(expected.getLong(2), actual.getLong(2), "age field should
match");
+ assertEquals(expected.getDouble(3), actual.getDouble(3), 0.001, "score
field should match");
+ assertEquals(expected.getBoolean(4), actual.getBoolean(4), "active
field should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadWithNulls() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("value", DataTypes.DoubleType, true);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 100.0));
+ expectedRows.add(createRow(2, null, 200.0)); // null name
+ expectedRows.add(createRow(3, "Charlie", null)); // null value
+ expectedRows.add(createRow(4, null, null)); // multiple nulls
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify nulls are preserved
+ assertTrue(actualRows.get(1).isNullAt(1), "Second row name should be
null");
+ assertFalse(actualRows.get(1).isNullAt(2), "Second row value should not
be null");
+
+ assertFalse(actualRows.get(2).isNullAt(1), "Third row name should not be
null");
+ assertTrue(actualRows.get(2).isNullAt(2), "Third row value should be
null");
+
+ assertTrue(actualRows.get(3).isNullAt(1), "Fourth row name should be
null");
+ assertTrue(actualRows.get(3).isNullAt(2), "Fourth row value should be
null");
+ }
+ }
+
+ @Test
+ public void testReadAllSupportedTypes() throws Exception {
+ StructType schema = new StructType()
+ .add("int_field", DataTypes.IntegerType, false)
+ .add("long_field", DataTypes.LongType, false)
+ .add("float_field", DataTypes.FloatType, false)
+ .add("double_field", DataTypes.DoubleType, false)
+ .add("bool_field", DataTypes.BooleanType, false)
+ .add("string_field", DataTypes.StringType, false)
+ .add("binary_field", DataTypes.BinaryType, false);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(new GenericInternalRow(new Object[]{
+ 42, // int
+ 123456789L, // long
+ 3.14f, // float
+ 2.71828, // double
+ true, // boolean
+ UTF8String.fromString("test"), // string
+ new byte[]{1, 2, 3, 4} // binary
+ }));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_all_types.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ assertEquals(1, reader.getTotalRecords());
+
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(1, actualRows.size());
+ InternalRow actual = actualRows.get(0);
+
+ assertEquals(42, actual.getInt(0), "int field should match");
+ assertEquals(123456789L, actual.getLong(1), "long field should match");
+ assertEquals(3.14f, actual.getFloat(2), 0.001, "float field should
match");
+ assertEquals(2.71828, actual.getDouble(3), 0.00001, "double field should
match");
+ assertTrue(actual.getBoolean(4), "bool field should match");
+ assertEquals("test", actual.getUTF8String(5).toString(), "string field
should match");
+
+ byte[] expectedBytes = new byte[]{1, 2, 3, 4};
+ byte[] actualBytes = actual.getBinary(6);
+ assertEquals(expectedBytes.length, actualBytes.length, "binary field
length should match");
+ for (int i = 0; i < expectedBytes.length; i++) {
+ assertEquals(expectedBytes[i], actualBytes[i], "binary field byte " +
i + " should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadLargeDataset() throws Exception {
+ // Test batch reading with more than 1000 records
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("value", DataTypes.LongType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_large.lance");
+ int recordCount = 2500;
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ for (int i = 0; i < recordCount; i++) {
+ GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long)
i * 2});
+ writer.writeRow("key" + i, row);
+ }
+ }
+
+ // Read back
+ try (HoodieSparkLanceReader reader = new HoodieSparkLanceReader(path)) {
+ assertEquals(recordCount, reader.getTotalRecords(), "Total record count
should match");
+
+ // Verify all records
+ int count = 0;
+ try (ClosableIterator<UnsafeRow> iterator =
reader.getUnsafeRowIterator()) {
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ assertEquals(count, row.getInt(0), "id should match");
+ assertEquals(count * 2L, row.getLong(1), "value should match");
+ count++;
+ }
+ }
+
+ assertEquals(recordCount, count, "Should read all records");
+ }
+ }
+
+ @Test
+ public void testGetRecordKeyIterator() throws Exception {
+ // Create schema with all 5 Hudi metadata fields
+ StructType schema = new StructType()
+ .add(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.IntegerType, true);
+
+ // Create test data with placeholder metadata fields
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRowWithMetaFields("Alice", 30));
+ expectedRows.add(createRowWithMetaFields("Bob", 25));
+ expectedRows.add(createRowWithMetaFields("Charlie", 35));
+ expectedRows.add(createRowWithMetaFields("David", 40));
+ expectedRows.add(createRowWithMetaFields("Eve", 28));
+
+ // Write with metadata population enabled
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_record_keys.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows, true)) {
+ // Read record keys using getRecordKeyIterator
+ List<String> actualKeys = new ArrayList<>();
+ try (ClosableIterator<String> keyIterator =
reader.getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ actualKeys.add(keyIterator.next());
+ }
+ }
+
+ // Verify all keys are returned in the same order
+ assertEquals(5, actualKeys.size(), "Should return all 5 record keys");
+ assertEquals("key0", actualKeys.get(0), "First key should match");
+ assertEquals("key1", actualKeys.get(1), "Second key should match");
+ assertEquals("key2", actualKeys.get(2), "Third key should match");
+ assertEquals("key3", actualKeys.get(3), "Fourth key should match");
+ assertEquals("key4", actualKeys.get(4), "Fifth key should match");
+ }
+ }
+
+ @Test
+ public void testFilterRowKeysWithCandidates() throws Exception {
+ // Create schema with all 5 Hudi metadata fields
+ StructType schema = new StructType()
+ .add(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false)
+ .add("value", DataTypes.IntegerType, true);
+
+ // Create 10 records with placeholder metadata fields
+ List<InternalRow> rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ rows.add(createRowWithMetaFields(i * 10));
+ }
+
+ // Write with metadata population enabled
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_filter_candidates.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
rows, true)) {
+ // Create candidate set with 3 specific keys
+ Set<String> candidateKeys = new HashSet<>();
+ candidateKeys.add("key2");
+ candidateKeys.add("key5");
+ candidateKeys.add("key7");
+
+ // Filter row keys
+ Set<Pair<String, Long>> result = reader.filterRowKeys(candidateKeys);
+
+ // Verify result contains exactly 3 entries with correct positions
+ assertEquals(3, result.size(), "Should return exactly 3 matching keys");
+
+ assertTrue(result.contains(Pair.of("key2", 2L)), "Should contain key2 at
position 2");
Review Comment:
Similarly let's use set equality here
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.HoodieSchemaConversionUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.util.HoodieArrowAllocator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.LanceArrowUtils;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+/**
+ * {@link HoodieSparkFileReader} implementation for Lance file format.
+ */
+public class HoodieSparkLanceReader implements HoodieSparkFileReader {
+ // Memory size for data read operations: 120MB
+ public static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024;
+
+ // Memory size for metadata operations: 8MB
+ private static final long LANCE_METADATA_ALLOCATOR_SIZE = 8 * 1024 * 1024;
+
+ // number of rows to read
+ private static final int DEFAULT_BATCH_SIZE = 512;
+ private final StoragePath path;
+
+ public HoodieSparkLanceReader(StoragePath path) {
+ this.path = path;
+ }
+
+ @Override
+ public String[] readMinMaxRecordKeys() {
+ throw new UnsupportedOperationException("Min/max record key tracking is
not yet supported for Lance file format");
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ throw new UnsupportedOperationException("Bloom filter is not yet supported
for Lance file format");
+ }
+
+ @Override
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+ Set<Pair<String, Long>> result = new HashSet<>();
+ long position = 0;
+
+ try (ClosableIterator<String> keyIterator = getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ String recordKey = keyIterator.next();
+ // If filter is empty/null, then all keys will be added.
+ // if filter has specific keys, then ensure only those are added
+ if (candidateRowKeys == null || candidateRowKeys.isEmpty()
+ || candidateRowKeys.contains(recordKey)) {
+ result.add(Pair.of(recordKey, position));
+ }
+ position++;
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to filter row keys from Lance file:
" + path, e);
+ }
+
+ return result;
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
+ return getRecordIterator(requestedSchema);
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchema schema) throws IOException {
+ ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator();
+ return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new
HoodieSparkRecord(data)));
+ }
+
+ @Override
+ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
+ // Get schema with only the record key field for efficient column pruning
+ HoodieSchema recordKeySchema = HoodieSchemaUtils.getRecordKeySchema();
+ ClosableIterator<UnsafeRow> iterator =
getUnsafeRowIterator(recordKeySchema);
+
+ // Map each UnsafeRow to extract the record key string directly from index 0
+ // The record key is at index 0 because we're using lance column
projection which has only the record key field
+ return new CloseableMappingIterator<>(iterator, data ->
data.getUTF8String(0).toString());
+ }
+
+ /**
+ * Get an iterator over UnsafeRows from the Lance file.
+ *
+ * @return ClosableIterator over UnsafeRows
+ * @throws IOException if reading fails
+ */
+ public ClosableIterator<UnsafeRow> getUnsafeRowIterator() {
+
+ BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-data-" + path.getName(),
LANCE_DATA_ALLOCATOR_SIZE);
+
+ try {
+ LanceFileReader lanceReader = LanceFileReader.open(path.toString(),
allocator);
+
+ // Get schema from Lance file and convert to Spark StructType
+ Schema arrowSchema = lanceReader.schema();
+ StructType sparkSchema = LanceArrowUtils.fromArrowSchema(arrowSchema);
+
+ ArrowReader arrowReader = lanceReader.readAll(null, null,
DEFAULT_BATCH_SIZE);
+
+ return new LanceRecordIterator(allocator, lanceReader, arrowReader,
sparkSchema);
+ } catch (Exception e) {
+ allocator.close();
+ throw new HoodieException("Failed to create Lance reader for: " + path,
e);
+ }
+ }
+
+ /**
+ * Get an iterator over UnsafeRows from the Lance file with column
projection.
+ * This allows reading only specific columns for better performance.
+ *
+ * @param requestedSchema Avro schema specifying which columns to read
+ * @return ClosableIterator over UnsafeRows
+ * @throws IOException if reading fails
+ */
+ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema
requestedSchema) {
+ // Convert HoodieSchema to Spark StructType
+ StructType requestedSparkSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(requestedSchema);
+
+ BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-data-" + path.getName(),
LANCE_DATA_ALLOCATOR_SIZE);
+
+ try {
+ LanceFileReader lanceReader = LanceFileReader.open(path.toString(),
allocator);
+
+ // Build list of column names to project (read only requested columns)
+ List<String> columnNames = new ArrayList<>(requestedSparkSchema.size());
+ for (StructField field : requestedSparkSchema.fields()) {
+ columnNames.add(field.name());
+ }
+
+ // Read only the requested columns from Lance file for efficiency
+ ArrowReader arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE);
+
+ return new LanceRecordIterator(allocator, lanceReader, arrowReader,
requestedSparkSchema);
+ } catch (Exception e) {
+ allocator.close();
+ throw new HoodieException("Failed to create Lance reader for: " + path,
e);
+ }
+ }
+
+ @Override
+ public HoodieSchema getSchema() {
+ // Read Arrow schema from Lance file and convert to Avro
+ try (BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-metadata-" + path.getName(),
LANCE_METADATA_ALLOCATOR_SIZE);
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ Schema arrowSchema = reader.schema();
+ StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema);
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,
"record", "", true);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to read schema from Lance file: " +
path, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // noop as resources are managed by the LanceRecordIterator and not within
this reader class.
+ }
+
+ @Override
+ public long getTotalRecords() {
+ try (BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-metadata-" + path.getName(),
LANCE_METADATA_ALLOCATOR_SIZE);
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ return reader.numRows();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get row count from Lance file: " +
path, e);
+ }
+ }
+
+ /**
+ * Iterator implementation that reads Lance file batches and converts to
UnsafeRows.
+ * Keeps ColumnarBatch alive while iterating to avoid unnecessary data
copying.
+ */
+ private class LanceRecordIterator implements ClosableIterator<UnsafeRow> {
+ private final BufferAllocator allocator;
+ private final LanceFileReader lanceReader;
+ private final ArrowReader arrowReader;
+ private final UnsafeProjection projection;
+ private ColumnarBatch currentBatch;
+ private Iterator<InternalRow> rowIterator;
+
+ public LanceRecordIterator(BufferAllocator allocator,
+ LanceFileReader lanceReader,
+ ArrowReader arrowReader,
+ StructType schema) {
+ this.allocator = allocator;
+ this.lanceReader = lanceReader;
+ this.arrowReader = arrowReader;
+ this.projection = UnsafeProjection.create(schema);
+ }
+
+ @Override
+ public boolean hasNext() {
+ // If we have records in current batch, return true
+ if (rowIterator != null && rowIterator.hasNext()) {
+ return true;
+ }
+
+ // Close previous batch before loading next
+ if (currentBatch != null) {
+ currentBatch.close();
+ currentBatch = null;
+ }
+
+ // Try to load next batch
+ try {
+ if (arrowReader.loadNextBatch()) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Wrap each Arrow FieldVector in LanceArrowColumnVector for
type-safe access
+ ColumnVector[] columns = root.getFieldVectors().stream()
+ .map(LanceArrowColumnVector::new)
+ .toArray(ColumnVector[]::new);
Review Comment:
Does this need to be recomputed per batch?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
+import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceReader}.
+ */
+public class TestHoodieSparkLanceReader {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testReadPrimitiveTypes() throws Exception {
+ // Create schema with primitive types
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true)
+ .add("score", DataTypes.DoubleType, true)
+ .add("active", DataTypes.BooleanType, true);
+
+ // Create test data
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 30L, 95.5, true));
+ expectedRows.add(createRow(2, "Bob", 25L, 87.3, false));
+ expectedRows.add(createRow(3, "Charlie", 35L, 92.1, true));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ // Verify record count
+ assertEquals(expectedRows.size(), reader.getTotalRecords(), "Record
count should match");
+
+ // Verify schema
+ assertNotNull(reader.getSchema(), "Schema should not be null");
+
+ // Read all records
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ // Verify record count
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify each record
+ for (int i = 0; i < expectedRows.size(); i++) {
+ InternalRow expected = expectedRows.get(i);
+ InternalRow actual = actualRows.get(i);
+
+ assertEquals(expected.getInt(0), actual.getInt(0), "id field should
match");
+ assertEquals(expected.getUTF8String(1), actual.getUTF8String(1), "name
field should match");
+ assertEquals(expected.getLong(2), actual.getLong(2), "age field should
match");
+ assertEquals(expected.getDouble(3), actual.getDouble(3), 0.001, "score
field should match");
+ assertEquals(expected.getBoolean(4), actual.getBoolean(4), "active
field should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadWithNulls() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("value", DataTypes.DoubleType, true);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 100.0));
+ expectedRows.add(createRow(2, null, 200.0)); // null name
+ expectedRows.add(createRow(3, "Charlie", null)); // null value
+ expectedRows.add(createRow(4, null, null)); // multiple nulls
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify nulls are preserved
+ assertTrue(actualRows.get(1).isNullAt(1), "Second row name should be
null");
+ assertFalse(actualRows.get(1).isNullAt(2), "Second row value should not
be null");
+
+ assertFalse(actualRows.get(2).isNullAt(1), "Third row name should not be
null");
+ assertTrue(actualRows.get(2).isNullAt(2), "Third row value should be
null");
+
+ assertTrue(actualRows.get(3).isNullAt(1), "Fourth row name should be
null");
+ assertTrue(actualRows.get(3).isNullAt(2), "Fourth row value should be
null");
+ }
+ }
+
+ @Test
+ public void testReadAllSupportedTypes() throws Exception {
Review Comment:
This overlaps with the first test in the class. Can we consolidate?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/LanceTestUtils.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Utility class for Lance file format tests.
+ * Provides helper methods for creating test data rows.
+ */
+public class LanceTestUtils {
+
+ private LanceTestUtils() {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Create InternalRow with placeholder Hudi metadata fields + user data.
+ * The 5 Hudi metadata fields are populated with null as placeholders.
+ *
+ * @param userValues User data values to append after the 5 metadata fields
+ * @return InternalRow with 5 metadata fields + user values
+ */
+ public static InternalRow createRowWithMetaFields(Object... userValues) {
+ Object[] allValues = new Object[5 + userValues.length];
+
+ // Meta fields - use null as placeholders (will be populated by writer)
+ allValues[0] = null; // commit_time
+ allValues[1] = null; // commit_seqno
+ allValues[2] = null; // record_key
+ allValues[3] = null; // partition_path
+ allValues[4] = null; // file_name
+
+ // Copy user values starting at index 5
+ for (int i = 0; i < userValues.length; i++) {
+ allValues[5 + i] = processValue(userValues[i]);
+ }
+
+ return new GenericInternalRow(allValues);
+ }
+
+ /**
+ * Create InternalRow from variable number of values.
+ * Automatically converts String values to UTF8String.
+ *
+ * @param values Values to include in the row
+ * @return InternalRow containing the processed values
+ */
+ public static InternalRow createRow(Object... values) {
+ Object[] processedValues = new Object[values.length];
+ for (int i = 0; i < values.length; i++) {
+ processedValues[i] = processValue(values[i]);
+ }
+ return new GenericInternalRow(processedValues);
+ }
+
+ /**
+ * Process a value for use in InternalRow.
+ * Converts String to UTF8String, passes through other types unchanged.
+ *
+ * @param value Value to process
+ * @return Processed value suitable for InternalRow
+ */
+ private static Object processValue(Object value) {
+ if (value instanceof String) {
+ return UTF8String.fromString((String) value);
+ }
+ return value;
+ }
+}
Review Comment:
nit: make sure to always add a newline at the end of files
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieArrowAllocator.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
Review Comment:
This feels like it should be under some IO or arrow sub-package. @yihua what
do you think?
Also should this go under `hudi-client-common` instead of `hudi-common`
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieArrowAllocator.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+
+/**
+ * Manages Arrow BufferAllocator lifecycle for Arrow-based file format
operations.
+ *
+ * <p>Following Arrow best practices:
+ * <ul>
+ * <li>Single RootAllocator per application</li>
+ * <li>Named child allocators for debugging and isolation</li>
+ * <li>Caller-specified memory limits per child allocator</li>
+ * </ul>
+ *
+ * <p>The root allocator is hardcoded to Long.MAX_VALUE and acts as a
+ * bookkeeper. Memory limits are enforced at the child allocator level, with
each
+ * caller specifying an appropriate limit for their use case.
+ *
+ * @see <a href="https://arrow.apache.org/docs/java/memory.html">Arrow Memory
Management</a>
+ */
+public class HoodieArrowAllocator {
+
+ private HoodieArrowAllocator() {
+ // Utility class
+ }
+
+ /**
+ * Initialization-on-demand holder idiom for thread-safe lazy initialization.
+ * The root allocator is created when first accessed and has a maximum size
of Long.MAX_VALUE.
+ */
+ private static class RootAllocatorHolder {
+ static final BufferAllocator INSTANCE = new RootAllocator(Long.MAX_VALUE);
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(
+ INSTANCE::close, "hudi-arrow-root-allocator-shutdown"));
+ }
+ }
+
+ /**
+ * Get the shared root allocator.
+ * Thread-safe lazy initialization using the holder idiom.
+ *
+ * @return The singleton root allocator instance
+ */
+ private static BufferAllocator getRootAllocator() {
+ return RootAllocatorHolder.INSTANCE;
+ }
+
+ /**
+ * Create a named child allocator for Arrow operations.
+ * Caller is responsible for closing the returned allocator.
+ *
+ * @param name Descriptive name for debugging (e.g.,
"HoodieSparkLanceReader-data-file.lance")
+ * @param childSizeBytes Maximum memory size in bytes for this child
allocator
+ * @return A new child allocator with the specified size limit
+ */
+ public static BufferAllocator newChildAllocator(String name, long
childSizeBytes) {
Review Comment:
Is there any limit to the name size?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
+import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceReader}.
+ */
+public class TestHoodieSparkLanceReader {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testReadPrimitiveTypes() throws Exception {
+ // Create schema with primitive types
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true)
+ .add("score", DataTypes.DoubleType, true)
+ .add("active", DataTypes.BooleanType, true);
+
+ // Create test data
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 30L, 95.5, true));
+ expectedRows.add(createRow(2, "Bob", 25L, 87.3, false));
+ expectedRows.add(createRow(3, "Charlie", 35L, 92.1, true));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ // Verify record count
+ assertEquals(expectedRows.size(), reader.getTotalRecords(), "Record
count should match");
+
+ // Verify schema
+ assertNotNull(reader.getSchema(), "Schema should not be null");
+
+ // Read all records
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ // Verify record count
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify each record
+ for (int i = 0; i < expectedRows.size(); i++) {
+ InternalRow expected = expectedRows.get(i);
+ InternalRow actual = actualRows.get(i);
+
+ assertEquals(expected.getInt(0), actual.getInt(0), "id field should
match");
+ assertEquals(expected.getUTF8String(1), actual.getUTF8String(1), "name
field should match");
+ assertEquals(expected.getLong(2), actual.getLong(2), "age field should
match");
+ assertEquals(expected.getDouble(3), actual.getDouble(3), 0.001, "score
field should match");
+ assertEquals(expected.getBoolean(4), actual.getBoolean(4), "active
field should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadWithNulls() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("value", DataTypes.DoubleType, true);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 100.0));
+ expectedRows.add(createRow(2, null, 200.0)); // null name
+ expectedRows.add(createRow(3, "Charlie", null)); // null value
+ expectedRows.add(createRow(4, null, null)); // multiple nulls
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify nulls are preserved
+ assertTrue(actualRows.get(1).isNullAt(1), "Second row name should be
null");
+ assertFalse(actualRows.get(1).isNullAt(2), "Second row value should not
be null");
+
+ assertFalse(actualRows.get(2).isNullAt(1), "Third row name should not be
null");
+ assertTrue(actualRows.get(2).isNullAt(2), "Third row value should be
null");
+
+ assertTrue(actualRows.get(3).isNullAt(1), "Fourth row name should be
null");
+ assertTrue(actualRows.get(3).isNullAt(2), "Fourth row value should be
null");
+ }
+ }
+
+ @Test
+ public void testReadAllSupportedTypes() throws Exception {
+ StructType schema = new StructType()
+ .add("int_field", DataTypes.IntegerType, false)
+ .add("long_field", DataTypes.LongType, false)
+ .add("float_field", DataTypes.FloatType, false)
+ .add("double_field", DataTypes.DoubleType, false)
+ .add("bool_field", DataTypes.BooleanType, false)
+ .add("string_field", DataTypes.StringType, false)
+ .add("binary_field", DataTypes.BinaryType, false);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(new GenericInternalRow(new Object[]{
+ 42, // int
+ 123456789L, // long
+ 3.14f, // float
+ 2.71828, // double
+ true, // boolean
+ UTF8String.fromString("test"), // string
+ new byte[]{1, 2, 3, 4} // binary
+ }));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_all_types.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ assertEquals(1, reader.getTotalRecords());
+
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(1, actualRows.size());
+ InternalRow actual = actualRows.get(0);
+
+ assertEquals(42, actual.getInt(0), "int field should match");
+ assertEquals(123456789L, actual.getLong(1), "long field should match");
+ assertEquals(3.14f, actual.getFloat(2), 0.001, "float field should
match");
+ assertEquals(2.71828, actual.getDouble(3), 0.00001, "double field should
match");
+ assertTrue(actual.getBoolean(4), "bool field should match");
+ assertEquals("test", actual.getUTF8String(5).toString(), "string field
should match");
+
+ byte[] expectedBytes = new byte[]{1, 2, 3, 4};
+ byte[] actualBytes = actual.getBinary(6);
+ assertEquals(expectedBytes.length, actualBytes.length, "binary field
length should match");
+ for (int i = 0; i < expectedBytes.length; i++) {
+ assertEquals(expectedBytes[i], actualBytes[i], "binary field byte " +
i + " should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadLargeDataset() throws Exception {
+ // Test batch reading with more than 1000 records
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("value", DataTypes.LongType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_large.lance");
+ int recordCount = 2500;
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ for (int i = 0; i < recordCount; i++) {
+ GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long)
i * 2});
+ writer.writeRow("key" + i, row);
+ }
+ }
+
+ // Read back
+ try (HoodieSparkLanceReader reader = new HoodieSparkLanceReader(path)) {
+ assertEquals(recordCount, reader.getTotalRecords(), "Total record count
should match");
+
+ // Verify all records
+ int count = 0;
+ try (ClosableIterator<UnsafeRow> iterator =
reader.getUnsafeRowIterator()) {
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ assertEquals(count, row.getInt(0), "id should match");
+ assertEquals(count * 2L, row.getLong(1), "value should match");
+ count++;
+ }
+ }
+
+ assertEquals(recordCount, count, "Should read all records");
+ }
+ }
+
+ @Test
+ public void testGetRecordKeyIterator() throws Exception {
+ // Create schema with all 5 Hudi metadata fields
+ StructType schema = new StructType()
+ .add(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.IntegerType, true);
+
+ // Create test data with placeholder metadata fields
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRowWithMetaFields("Alice", 30));
+ expectedRows.add(createRowWithMetaFields("Bob", 25));
+ expectedRows.add(createRowWithMetaFields("Charlie", 35));
+ expectedRows.add(createRowWithMetaFields("David", 40));
+ expectedRows.add(createRowWithMetaFields("Eve", 28));
+
+ // Write with metadata population enabled
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_record_keys.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows, true)) {
+ // Read record keys using getRecordKeyIterator
+ List<String> actualKeys = new ArrayList<>();
+ try (ClosableIterator<String> keyIterator =
reader.getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ actualKeys.add(keyIterator.next());
+ }
+ }
+
+ // Verify all keys are returned in the same order
+ assertEquals(5, actualKeys.size(), "Should return all 5 record keys");
+ assertEquals("key0", actualKeys.get(0), "First key should match");
+ assertEquals("key1", actualKeys.get(1), "Second key should match");
+ assertEquals("key2", actualKeys.get(2), "Third key should match");
+ assertEquals("key3", actualKeys.get(3), "Fourth key should match");
+ assertEquals("key4", actualKeys.get(4), "Fifth key should match");
Review Comment:
Use a list comparison here?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
+import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceReader}.
+ */
+public class TestHoodieSparkLanceReader {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testReadPrimitiveTypes() throws Exception {
+ // Create schema with primitive types
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true)
+ .add("score", DataTypes.DoubleType, true)
+ .add("active", DataTypes.BooleanType, true);
+
+ // Create test data
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 30L, 95.5, true));
+ expectedRows.add(createRow(2, "Bob", 25L, 87.3, false));
+ expectedRows.add(createRow(3, "Charlie", 35L, 92.1, true));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ // Verify record count
+ assertEquals(expectedRows.size(), reader.getTotalRecords(), "Record
count should match");
+
+ // Verify schema
+ assertNotNull(reader.getSchema(), "Schema should not be null");
+
+ // Read all records
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ // Verify record count
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify each record
+ for (int i = 0; i < expectedRows.size(); i++) {
+ InternalRow expected = expectedRows.get(i);
+ InternalRow actual = actualRows.get(i);
+
+ assertEquals(expected.getInt(0), actual.getInt(0), "id field should
match");
Review Comment:
Is there an issue when trying to directly assert the equality between the
rows?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.HoodieSchemaConversionUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.util.HoodieArrowAllocator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.LanceArrowUtils;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+/**
+ * {@link HoodieSparkFileReader} implementation for Lance file format.
+ */
+public class HoodieSparkLanceReader implements HoodieSparkFileReader {
+ // Memory size for data read operations: 120MB
+ public static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024;
+
+ // Memory size for metadata operations: 8MB
+ private static final long LANCE_METADATA_ALLOCATOR_SIZE = 8 * 1024 * 1024;
+
+ // number of rows to read
+ private static final int DEFAULT_BATCH_SIZE = 512;
+ private final StoragePath path;
+
+ public HoodieSparkLanceReader(StoragePath path) {
+ this.path = path;
+ }
+
+ @Override
+ public String[] readMinMaxRecordKeys() {
+ throw new UnsupportedOperationException("Min/max record key tracking is
not yet supported for Lance file format");
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ throw new UnsupportedOperationException("Bloom filter is not yet supported
for Lance file format");
+ }
+
+ @Override
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+ Set<Pair<String, Long>> result = new HashSet<>();
+ long position = 0;
+
+ try (ClosableIterator<String> keyIterator = getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ String recordKey = keyIterator.next();
+ // If filter is empty/null, then all keys will be added.
+ // if filter has specific keys, then ensure only those are added
+ if (candidateRowKeys == null || candidateRowKeys.isEmpty()
+ || candidateRowKeys.contains(recordKey)) {
+ result.add(Pair.of(recordKey, position));
+ }
+ position++;
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to filter row keys from Lance file:
" + path, e);
+ }
+
+ return result;
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
+ return getRecordIterator(requestedSchema);
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<InternalRow>>
getRecordIterator(HoodieSchema schema) throws IOException {
+ ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator();
+ return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new
HoodieSparkRecord(data)));
+ }
+
+ @Override
+ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
+ // Get schema with only the record key field for efficient column pruning
+ HoodieSchema recordKeySchema = HoodieSchemaUtils.getRecordKeySchema();
+ ClosableIterator<UnsafeRow> iterator =
getUnsafeRowIterator(recordKeySchema);
+
+ // Map each UnsafeRow to extract the record key string directly from index 0
+ // The record key is at index 0 because we're using lance column
projection which has only the record key field
+ return new CloseableMappingIterator<>(iterator, data ->
data.getUTF8String(0).toString());
+ }
+
+ /**
+ * Get an iterator over UnsafeRows from the Lance file.
+ *
+ * @return ClosableIterator over UnsafeRows
+ * @throws IOException if reading fails
+ */
+ public ClosableIterator<UnsafeRow> getUnsafeRowIterator() {
+
+ BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-data-" + path.getName(),
LANCE_DATA_ALLOCATOR_SIZE);
+
+ try {
+ LanceFileReader lanceReader = LanceFileReader.open(path.toString(),
allocator);
+
+ // Get schema from Lance file and convert to Spark StructType
+ Schema arrowSchema = lanceReader.schema();
+ StructType sparkSchema = LanceArrowUtils.fromArrowSchema(arrowSchema);
+
+ ArrowReader arrowReader = lanceReader.readAll(null, null,
DEFAULT_BATCH_SIZE);
+
+ return new LanceRecordIterator(allocator, lanceReader, arrowReader,
sparkSchema);
+ } catch (Exception e) {
+ allocator.close();
+ throw new HoodieException("Failed to create Lance reader for: " + path,
e);
+ }
+ }
+
+ /**
+ * Get an iterator over UnsafeRows from the Lance file with column
projection.
+ * This allows reading only specific columns for better performance.
+ *
+ * @param requestedSchema Avro schema specifying which columns to read
+ * @return ClosableIterator over UnsafeRows
+ * @throws IOException if reading fails
+ */
+ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema
requestedSchema) {
Review Comment:
Make this private?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
+import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceReader}.
+ */
+public class TestHoodieSparkLanceReader {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testReadPrimitiveTypes() throws Exception {
+ // Create schema with primitive types
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true)
+ .add("score", DataTypes.DoubleType, true)
+ .add("active", DataTypes.BooleanType, true);
+
+ // Create test data
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 30L, 95.5, true));
+ expectedRows.add(createRow(2, "Bob", 25L, 87.3, false));
+ expectedRows.add(createRow(3, "Charlie", 35L, 92.1, true));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ // Verify record count
+ assertEquals(expectedRows.size(), reader.getTotalRecords(), "Record
count should match");
+
+ // Verify schema
+ assertNotNull(reader.getSchema(), "Schema should not be null");
+
+ // Read all records
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ // Verify record count
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify each record
+ for (int i = 0; i < expectedRows.size(); i++) {
+ InternalRow expected = expectedRows.get(i);
+ InternalRow actual = actualRows.get(i);
+
+ assertEquals(expected.getInt(0), actual.getInt(0), "id field should
match");
+ assertEquals(expected.getUTF8String(1), actual.getUTF8String(1), "name
field should match");
+ assertEquals(expected.getLong(2), actual.getLong(2), "age field should
match");
+ assertEquals(expected.getDouble(3), actual.getDouble(3), 0.001, "score
field should match");
+ assertEquals(expected.getBoolean(4), actual.getBoolean(4), "active
field should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadWithNulls() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("value", DataTypes.DoubleType, true);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRow(1, "Alice", 100.0));
+ expectedRows.add(createRow(2, null, 200.0)); // null name
+ expectedRows.add(createRow(3, "Charlie", null)); // null value
+ expectedRows.add(createRow(4, null, null)); // multiple nulls
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(expectedRows.size(), actualRows.size(), "Should read same
number of records");
+
+ // Verify nulls are preserved
+ assertTrue(actualRows.get(1).isNullAt(1), "Second row name should be
null");
+ assertFalse(actualRows.get(1).isNullAt(2), "Second row value should not
be null");
+
+ assertFalse(actualRows.get(2).isNullAt(1), "Third row name should not be
null");
+ assertTrue(actualRows.get(2).isNullAt(2), "Third row value should be
null");
+
+ assertTrue(actualRows.get(3).isNullAt(1), "Fourth row name should be
null");
+ assertTrue(actualRows.get(3).isNullAt(2), "Fourth row value should be
null");
+ }
+ }
+
+ @Test
+ public void testReadAllSupportedTypes() throws Exception {
+ StructType schema = new StructType()
+ .add("int_field", DataTypes.IntegerType, false)
+ .add("long_field", DataTypes.LongType, false)
+ .add("float_field", DataTypes.FloatType, false)
+ .add("double_field", DataTypes.DoubleType, false)
+ .add("bool_field", DataTypes.BooleanType, false)
+ .add("string_field", DataTypes.StringType, false)
+ .add("binary_field", DataTypes.BinaryType, false);
+
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(new GenericInternalRow(new Object[]{
+ 42, // int
+ 123456789L, // long
+ 3.14f, // float
+ 2.71828, // double
+ true, // boolean
+ UTF8String.fromString("test"), // string
+ new byte[]{1, 2, 3, 4} // binary
+ }));
+
+ // Write and read back
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_all_types.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows)) {
+ assertEquals(1, reader.getTotalRecords());
+
+ List<InternalRow> actualRows = readAllRows(reader);
+
+ assertEquals(1, actualRows.size());
+ InternalRow actual = actualRows.get(0);
+
+ assertEquals(42, actual.getInt(0), "int field should match");
+ assertEquals(123456789L, actual.getLong(1), "long field should match");
+ assertEquals(3.14f, actual.getFloat(2), 0.001, "float field should
match");
+ assertEquals(2.71828, actual.getDouble(3), 0.00001, "double field should
match");
+ assertTrue(actual.getBoolean(4), "bool field should match");
+ assertEquals("test", actual.getUTF8String(5).toString(), "string field
should match");
+
+ byte[] expectedBytes = new byte[]{1, 2, 3, 4};
+ byte[] actualBytes = actual.getBinary(6);
+ assertEquals(expectedBytes.length, actualBytes.length, "binary field
length should match");
+ for (int i = 0; i < expectedBytes.length; i++) {
+ assertEquals(expectedBytes[i], actualBytes[i], "binary field byte " +
i + " should match");
+ }
+ }
+ }
+
+ @Test
+ public void testReadLargeDataset() throws Exception {
+ // Test batch reading with more than 1000 records
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("value", DataTypes.LongType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_large.lance");
+ int recordCount = 2500;
+ try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false)) {
+ for (int i = 0; i < recordCount; i++) {
+ GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long)
i * 2});
+ writer.writeRow("key" + i, row);
+ }
+ }
+
+ // Read back
+ try (HoodieSparkLanceReader reader = new HoodieSparkLanceReader(path)) {
+ assertEquals(recordCount, reader.getTotalRecords(), "Total record count
should match");
+
+ // Verify all records
+ int count = 0;
+ try (ClosableIterator<UnsafeRow> iterator =
reader.getUnsafeRowIterator()) {
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ assertEquals(count, row.getInt(0), "id should match");
+ assertEquals(count * 2L, row.getLong(1), "value should match");
+ count++;
+ }
+ }
+
+ assertEquals(recordCount, count, "Should read all records");
+ }
+ }
+
+ @Test
+ public void testGetRecordKeyIterator() throws Exception {
+ // Create schema with all 5 Hudi metadata fields
+ StructType schema = new StructType()
+ .add(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.IntegerType, true);
+
+ // Create test data with placeholder metadata fields
+ List<InternalRow> expectedRows = new ArrayList<>();
+ expectedRows.add(createRowWithMetaFields("Alice", 30));
+ expectedRows.add(createRowWithMetaFields("Bob", 25));
+ expectedRows.add(createRowWithMetaFields("Charlie", 35));
+ expectedRows.add(createRowWithMetaFields("David", 40));
+ expectedRows.add(createRowWithMetaFields("Eve", 28));
+
+ // Write with metadata population enabled
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_record_keys.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
expectedRows, true)) {
+ // Read record keys using getRecordKeyIterator
+ List<String> actualKeys = new ArrayList<>();
+ try (ClosableIterator<String> keyIterator =
reader.getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ actualKeys.add(keyIterator.next());
+ }
+ }
+
+ // Verify all keys are returned in the same order
+ assertEquals(5, actualKeys.size(), "Should return all 5 record keys");
+ assertEquals("key0", actualKeys.get(0), "First key should match");
+ assertEquals("key1", actualKeys.get(1), "Second key should match");
+ assertEquals("key2", actualKeys.get(2), "Third key should match");
+ assertEquals("key3", actualKeys.get(3), "Fourth key should match");
+ assertEquals("key4", actualKeys.get(4), "Fifth key should match");
+ }
+ }
+
+ @Test
+ public void testFilterRowKeysWithCandidates() throws Exception {
+ // Create schema with all 5 Hudi metadata fields
+ StructType schema = new StructType()
+ .add(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType,
false)
+ .add(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false)
+ .add("value", DataTypes.IntegerType, true);
+
+ // Create 10 records with placeholder metadata fields
+ List<InternalRow> rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ rows.add(createRowWithMetaFields(i * 10));
+ }
+
+ // Write with metadata population enabled
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_filter_candidates.lance");
+ try (HoodieSparkLanceReader reader = writeAndCreateReader(path, schema,
rows, true)) {
+ // Create candidate set with 3 specific keys
+ Set<String> candidateKeys = new HashSet<>();
+ candidateKeys.add("key2");
+ candidateKeys.add("key5");
+ candidateKeys.add("key7");
+
+ // Filter row keys
+ Set<Pair<String, Long>> result = reader.filterRowKeys(candidateKeys);
+
+ // Verify result contains exactly 3 entries with correct positions
+ assertEquals(3, result.size(), "Should return exactly 3 matching keys");
+
+ assertTrue(result.contains(Pair.of("key2", 2L)), "Should contain key2 at
position 2");
+ assertTrue(result.contains(Pair.of("key5", 5L)), "Should contain key5 at
position 5");
+ assertTrue(result.contains(Pair.of("key7", 7L)), "Should contain key7 at
position 7");
+ }
+ }
+
+ @Test
+ public void testFilterRowKeysWithEmptySet() throws Exception {
Review Comment:
This is almost identical to the test below. Let's parameterize these two
cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]