voonhous commented on code in PR #17632: URL: https://github.com/apache/hudi/pull/17632#discussion_r2639980943
########## hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.lance; + +import com.lancedb.lance.file.LanceFileWriter; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import javax.annotation.concurrent.NotThreadSafe; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Base class for Hudi Lance file writers supporting different record types. + * + * This class handles common Lance file operations including: + * - LanceFileWriter lifecycle management + * - BufferAllocator management + * - Record buffering and batch flushing + * - File size checks + * + * Subclasses must implement type-specific conversion to Arrow format. + * + * @param <R> The record type (e.g., GenericRecord, InternalRow) + */ +@NotThreadSafe +public abstract class HoodieBaseLanceWriter<R> implements Closeable { + protected static final int DEFAULT_BATCH_SIZE = 1000; + protected final HoodieStorage storage; + protected final StoragePath path; + protected final BufferAllocator allocator; + protected final List<R> bufferedRecords; + protected final int batchSize; + protected final long maxFileSize; + protected long writtenRecordCount = 0; + protected VectorSchemaRoot root; + + private LanceFileWriter writer; + + /** + * Constructor for base Lance writer. + * + * @param storage HoodieStorage instance + * @param path Path where Lance file will be written + * @param batchSize Number of records to buffer before flushing to Lance + * @param maxFileSize Maximum file size in bytes before rolling over to new file + */ + protected HoodieBaseLanceWriter(HoodieStorage storage, StoragePath path, int batchSize, long maxFileSize) { + this.storage = storage; + this.path = path; + this.allocator = new RootAllocator(Long.MAX_VALUE); + this.bufferedRecords = new ArrayList<>(batchSize); + this.batchSize = batchSize; + this.maxFileSize = maxFileSize; + } + + /** + * Populate the VectorSchemaRoot with buffered records. + * Subclasses must implement type-specific conversion logic. + * The VectorSchemaRoot field is reused across batches and managed by this base class. + * + * @param records List of records to convert + */ + protected abstract void populateVectorSchemaRoot(List<R> records); + + /** + * Get the Arrow schema for this writer. + * Subclasses must provide the Arrow schema corresponding to their record type. + * + * @return Arrow schema + */ + protected abstract Schema getArrowSchema(); + + /** + * Write a single record. Records are buffered and flushed in batches. + * + * @param record Record to write + * @throws IOException if write fails + */ + public void write(R record) throws IOException { + bufferedRecords.add(record); + writtenRecordCount++; + + if (bufferedRecords.size() >= batchSize) { + flushBatch(); + } + } + + /** + * Check if writer can accept more records based on file size. + * Uses filesystem-based size checking (similar to ORC/HFile approach). + * + * @return true if writer can accept more records, false if file size limit reached + */ + public boolean canWrite() { + //TODO will need to implement proper way to compute this + return true; + } + + /** + * Get the total number of records written so far. + * + * @return Number of records written + */ + public long getWrittenRecordCount() { + return writtenRecordCount; + } + + /** + * Close the writer, flushing any remaining buffered records. + * + * @throws IOException if close fails + */ + @Override + public void close() throws IOException { + try { + // Flush any remaining buffered records + if (!bufferedRecords.isEmpty()) { + flushBatch(); + } + + // Close Lance writer + if (writer != null) { + writer.close(); + } + + // Close VectorSchemaRoot + if (root != null) { + root.close(); + } + } catch (Exception e) { + throw new HoodieException("Failed to close Lance writer: " + path, e); + } finally { + // Always close allocator + allocator.close(); + } + } Review Comment: Nit: Use pattern where you suppress subsequent exceptions so the first exception is preserved to ensure that even if writer.close() fails, you still attempt to close the root and the allocator, while ensuring the original exception is what the user ultimately sees. ```java @Override public void close() throws IOException { Exception primaryException = null; // 1. Flush remaining records try { if (!bufferedRecords.isEmpty()) { flushBatch(); } } catch (Exception e) { primaryException = e; } // 2. Close Lance Writer if (writer != null) { try { writer.close(); } catch (Exception e) { if (primaryException == null) primaryException = e; else primaryException.addSuppressed(e); } } // 3. Close VectorSchemaRoot if (root != null) { try { root.close(); } catch (Exception e) { if (primaryException == null) primaryException = e; else primaryException.addSuppressed(e); } } // 4. Always close allocator last try { allocator.close(); } catch (Exception e) { if (primaryException == null) primaryException = e; else primaryException.addSuppressed(e); } // Final check: if anything failed, wrap and throw if (primaryException != null) { throw new HoodieException("Failed to close Lance writer: " + path, primaryException); } } ``` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import com.lancedb.lance.file.LanceFileReader; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.hudi.HoodieSchemaConversionUtils; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieSparkRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.LanceArrowColumnVector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + +/** + * {@link HoodieSparkFileReader} implementation for Lance file format. + */ +public class HoodieSparkLanceReader implements HoodieSparkFileReader { + // number of rows to read + private static final int DEFAULT_BATCH_SIZE = 512; + private final StoragePath path; + private final HoodieStorage storage; + + public HoodieSparkLanceReader(HoodieStorage storage, StoragePath path) { + this.path = path; + this.storage = storage; + } + + @Override + public String[] readMinMaxRecordKeys() { + throw new UnsupportedOperationException("Min/max record key tracking is not yet supported for Lance file format"); + } + + @Override + public BloomFilter readBloomFilter() { + throw new UnsupportedOperationException("Bloom filter is not yet supported for Lance file format"); + } + + @Override + public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) { + Set<Pair<String, Long>> result = new HashSet<>(); + long position = 0; + + try (ClosableIterator<String> keyIterator = getRecordKeyIterator()) { + while (keyIterator.hasNext()) { + String recordKey = keyIterator.next(); + // If filter is empty/null, then all keys will be added. + // if filter has specific keys, then ensure only those are added + if (candidateRowKeys == null || candidateRowKeys.isEmpty() + || candidateRowKeys.contains(recordKey)) { + result.add(Pair.of(recordKey, position)); + } + position++; + } + } catch (IOException e) { + throw new HoodieIOException("Failed to filter row keys from Lance file: " + path, e); + } + + return result; + } + + @Override + public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) throws IOException { + return getRecordIterator(requestedSchema); + } + + @Override + public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(HoodieSchema schema) throws IOException { + ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(); + return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data))); + } + + @Override + public ClosableIterator<String> getRecordKeyIterator() throws IOException { + //TODO to revisit adding support for when metadata fields are not persisted. + + // Get schema with only the record key field for efficient column pruning + HoodieSchema recordKeySchema = HoodieSchemaUtils.getRecordKeySchema(); + ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(recordKeySchema); + + // Map each UnsafeRow to extract the record key string directly from index 0 + // The record key is at index 0 because we're using lance column projection which has only the record key field + return new CloseableMappingIterator<>(iterator, data -> data.getUTF8String(0).toString()); + } + + /** + * Get an iterator over UnsafeRows from the Lance file. + * + * @return ClosableIterator over UnsafeRows + * @throws IOException if reading fails + */ + public ClosableIterator<UnsafeRow> getUnsafeRowIterator() { + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + + try { + LanceFileReader lanceReader = LanceFileReader.open(path.toString(), allocator); + + // Get schema from Lance file and convert to Spark StructType + org.apache.arrow.vector.types.pojo.Schema arrowSchema = lanceReader.schema(); + StructType sparkSchema = LanceArrowUtils.fromArrowSchema(arrowSchema); + + ArrowReader arrowReader = lanceReader.readAll(null, null, DEFAULT_BATCH_SIZE); + + return new LanceRecordIterator(allocator, lanceReader, arrowReader, sparkSchema); + } catch (Exception e) { + allocator.close(); + throw new HoodieException("Failed to create Lance reader for: " + path, e); + } + } + + /** + * Get an iterator over UnsafeRows from the Lance file with column projection. + * This allows reading only specific columns for better performance. + * + * @param requestedSchema Avro schema specifying which columns to read + * @return ClosableIterator over UnsafeRows + * @throws IOException if reading fails + */ + public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedSchema) { + // Convert HoodieSchema to Spark StructType + StructType requestedSparkSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(requestedSchema); + + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); Review Comment: Is this dangerous? Many tasks run in parallel in spark env. If every writer/reader creates its own root allocator without a memory limit, we run the risk of OOM or failing to track memory usage across the JVM. We could use a singleton or a shared BufferAllocator at the executor level, or at least bound the memory to a fraction of the executor's memory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
