This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f6a33fd828d5 feat(lance): Implement canWrite() in 
HoodieSparkLanceWriter with configurable max file size for Lance (#18341)
f6a33fd828d5 is described below

commit f6a33fd828d526811842d4ca25b0d227d8b1a21d
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Mar 31 16:25:54 2026 +0700

    feat(lance): Implement canWrite() in HoodieSparkLanceWriter with 
configurable max file size for Lance (#18341)
---
 .../io/storage/HoodieSparkFileWriterFactory.java   |  12 +-
 .../hudi/io/storage/HoodieSparkLanceWriter.java    | 104 +++++++++-----
 .../row/HoodieInternalRowFileWriterFactory.java    |  19 ++-
 .../hudi/common/config/HoodieStorageConfig.java    |  11 ++
 .../hudi/io/lance/HoodieBaseLanceWriter.java       |  22 +++
 .../io/storage/TestHoodieSparkLanceReader.java     |  15 +-
 .../io/storage/TestHoodieSparkLanceWriter.java     | 153 ++++++++++++++++++---
 7 files changed, 272 insertions(+), 64 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 9ebb94a0bb54..bb075470c507 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -111,8 +111,18 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
     boolean enableBloomFilter = enableBloomFilter(populateMetaFields, config);
     Option<BloomFilter> bloomFilter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
+    long maxFileSize = 
config.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE);
 
-    return new HoodieSparkLanceWriter(path, structType, instantTime, 
taskContextSupplier, storage, populateMetaFields, bloomFilter);
+    return HoodieSparkLanceWriter.builder()
+        .file(path)
+        .sparkSchema(structType)
+        .instantTime(instantTime)
+        .taskContextSupplier(taskContextSupplier)
+        .storage(storage)
+        .populateMetaFields(populateMetaFields)
+        .bloomFilterOpt(bloomFilter)
+        .maxFileSize(maxFileSize)
+        .build();
   }
 
   private static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema 
schema,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index e029fcbab010..02f9c7e3b16b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -31,6 +32,7 @@ import org.apache.hudi.storage.StoragePath;
 
 import com.lancedb.lance.spark.arrow.LanceArrowWriter;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -46,6 +48,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMM
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 /**
  * Spark Lance file writer implementing {@link HoodieSparkFileWriter} and 
{@link HoodieInternalRowFileWriter}.
@@ -60,6 +63,8 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow, U
     implements HoodieSparkFileWriter, HoodieInternalRowFileWriter {
 
   private static final String DEFAULT_TIMEZONE = "UTC";
+  private static final long MIN_RECORDS_FOR_SIZE_CHECK = 100L;
+  private static final long MAX_RECORDS_FOR_SIZE_CHECK = 10000L;
 
   private final StructType sparkSchema;
   private final Schema arrowSchema;
@@ -67,52 +72,66 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow, U
   private final UTF8String instantTime;
   private final boolean populateMetaFields;
   private final Function<Long, String> seqIdGenerator;
+  private final long maxFileSize;
+  private long recordCountForNextSizeCheck = MIN_RECORDS_FOR_SIZE_CHECK;
 
   /**
-   * Constructor for Spark Lance writer.
+   * Creates a new builder for constructing {@link HoodieSparkLanceWriter} 
instances.
    *
-   * @param file Path where Lance file will be written
-   * @param sparkSchema Spark schema for the data
-   * @param instantTime Instant time for the commit
-   * @param taskContextSupplier Task context supplier for partition ID
-   * @param storage HoodieStorage instance
-   * @param populateMetaFields Whether to populate Hudi metadata fields
-   * @param bloomFilterOpt Optional bloom filter for record key tracking
+   * <p>Required parameters: {@code file}, {@code sparkSchema}, {@code 
taskContextSupplier}, {@code storage}.
+   * <p>Optional parameters with defaults:
+   * <ul>
+   *   <li>{@code instantTime} — defaults to {@code null}</li>
+   *   <li>{@code populateMetaFields} — defaults to {@code false}</li>
+   *   <li>{@code bloomFilterOpt} — defaults to {@link Option#empty()}</li>
+   *   <li>{@code maxFileSize} — defaults to {@link 
HoodieStorageConfig#LANCE_MAX_FILE_SIZE}</li>
+   * </ul>
    */
-  public HoodieSparkLanceWriter(StoragePath file,
-                                StructType sparkSchema,
-                                String instantTime,
-                                TaskContextSupplier taskContextSupplier,
-                                HoodieStorage storage,
-                                boolean populateMetaFields,
-                                Option<BloomFilter> bloomFilterOpt) {
+  @Builder(builderMethodName = "builder")
+  private static HoodieSparkLanceWriter create(
+      StoragePath file,
+      StructType sparkSchema,
+      String instantTime,
+      TaskContextSupplier taskContextSupplier,
+      HoodieStorage storage,
+      boolean populateMetaFields,
+      Option<BloomFilter> bloomFilterOpt,
+      long maxFileSize) {
+    checkArgument(maxFileSize > 0, "maxFileSize must be a positive number");
+    return new HoodieSparkLanceWriter(file, sparkSchema, instantTime,
+        taskContextSupplier, storage, populateMetaFields, bloomFilterOpt, 
maxFileSize);
+  }
+
+  /**
+   * Manually declared builder class to provide default values for optional 
parameters.
+   * Lombok fills in the remaining builder methods.
+   */
+  public static class HoodieSparkLanceWriterBuilder {
+    private Option<BloomFilter> bloomFilterOpt = Option.empty();
+    private long maxFileSize = 
Long.parseLong(HoodieStorageConfig.LANCE_MAX_FILE_SIZE.defaultValue());
+  }
+
+  private HoodieSparkLanceWriter(StoragePath file,
+                                 StructType sparkSchema,
+                                 String instantTime,
+                                 TaskContextSupplier taskContextSupplier,
+                                 HoodieStorage storage,
+                                 boolean populateMetaFields,
+                                 Option<BloomFilter> bloomFilterOpt,
+                                 long maxFileSize) {
     super(file, DEFAULT_BATCH_SIZE, 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
     this.sparkSchema = sparkSchema;
     this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema, 
DEFAULT_TIMEZONE, true, false);
     this.fileName = UTF8String.fromString(file.getName());
     this.instantTime = UTF8String.fromString(instantTime);
     this.populateMetaFields = populateMetaFields;
+    this.maxFileSize = maxFileSize;
     this.seqIdGenerator = recordIndex -> {
       Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
       return HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex);
     };
   }
 
-  /**
-   * Constructor for Spark Lance writer used for internal row writing with 
pre-embedded metadata.
-   *
-   * @param file Path where Lance file will be written
-   * @param sparkSchema Spark schema for the data
-   * @param taskContextSupplier Task context supplier for partition ID
-   * @param storage HoodieStorage instance
-   */
-  public HoodieSparkLanceWriter(StoragePath file,
-                                StructType sparkSchema,
-                                TaskContextSupplier taskContextSupplier,
-                                HoodieStorage storage) {
-    this(file, sparkSchema, null, taskContextSupplier, storage, false, 
Option.empty());
-  }
-
   @Override
   public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws 
IOException {
     UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
@@ -147,13 +166,30 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow, U
   }
 
   /**
-   * Check if writer can accept more records based on file size.
-   * Uses filesystem-based size checking (similar to ORC/HFile approach).
+   * Check if writer can accept more records based on estimated data size.
+   * Data size is approximated by accumulating Arrow buffer sizes across 
flushed batches,
+   * analogous to {@code ParquetWriter.getDataSize()}.
+   * The check is performed periodically (not on every record) and the 
interval adapts
+   * based on the observed average record size.
    *
-   * @return true if writer can accept more records, false if file size limit 
reached
+   * @return true if writer can accept more records, false if file size limit 
is reached
    */
   public boolean canWrite() {
-    //TODO https://github.com/apache/hudi/issues/17684
+    long writtenCount = getWrittenRecordCount();
+    if (writtenCount >= recordCountForNextSizeCheck) {
+      long dataSize = getDataSize();
+      // In extreme cases (e.g. all records same value, high compression 
ratio),
+      // dataSize may be 0; force avgRecordSize to at least 1 to avoid 
division by zero.
+      long avgRecordSize = Math.max(dataSize / writtenCount, 1);
+      // Return false when within ~2 records of the limit
+      if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+        return false;
+      }
+      recordCountForNextSizeCheck = writtenCount + Math.min(
+          // Check at halfway between current position and the limit
+          Math.max(MIN_RECORDS_FOR_SIZE_CHECK, (maxFileSize / avgRecordSize - 
writtenCount) / 2),
+          MAX_RECORDS_FOR_SIZE_CHECK);
+    }
     return true;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index b1431ae70d80..e1f1d66dff1a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.config.HoodieParquetConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.LocalTaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
@@ -63,7 +64,8 @@ public class HoodieInternalRowFileWriterFactory {
     if (PARQUET.getFileExtension().equals(extension)) {
       return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig, 
schema, tryInstantiateBloomFilter(writeConfig));
     } else if (LANCE.getFileExtension().equals(extension)) {
-      return newLanceInternalRowFileWriter(path, hoodieTable, schema);
+      long maxFileSize = 
writeConfig.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE);
+      return newLanceInternalRowFileWriter(path, hoodieTable, schema, 
maxFileSize);
     }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
@@ -94,13 +96,16 @@ public class HoodieInternalRowFileWriterFactory {
 
   private static HoodieInternalRowFileWriter 
newLanceInternalRowFileWriter(StoragePath path,
                                                                            
HoodieTable table,
-                                                                           
StructType structType)
+                                                                           
StructType structType,
+                                                                           
long maxFileSize)
       throws IOException {
-    return new HoodieSparkLanceWriter(
-        path,
-        structType,
-        new LocalTaskContextSupplier(),
-        table.getStorage());
+    return HoodieSparkLanceWriter.builder()
+        .file(path)
+        .sparkSchema(structType)
+        .taskContextSupplier(new LocalTaskContextSupplier())
+        .storage(table.getStorage())
+        .maxFileSize(maxFileSize)
+        .build();
   }
 
   private static Option<BloomFilter> 
tryInstantiateBloomFilter(HoodieWriteConfig writeConfig) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 56ad4a0f92b4..5f155e57a1eb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -80,6 +80,12 @@ public class HoodieStorageConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Target file size in bytes for HFile base files.");
 
+  public static final ConfigProperty<String> LANCE_MAX_FILE_SIZE = 
ConfigProperty
+      .key("hoodie.lance.max.file.size")
+      .defaultValue(String.valueOf(120 * 1024 * 1024))
+      .markAdvanced()
+      .withDocumentation("Target file size in bytes for Lance base files.");
+
   public static final ConfigProperty<Boolean> HFILE_WRITER_TO_ALLOW_DUPLICATES 
= ConfigProperty
       .key("hoodie.hfile.writes.allow.duplicates")
       .defaultValue(false)
@@ -507,6 +513,11 @@ public class HoodieStorageConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder lanceMaxFileSize(long maxFileSize) {
+      storageConfig.setValue(LANCE_MAX_FILE_SIZE, String.valueOf(maxFileSize));
+      return this;
+    }
+
     public Builder orcStripeSize(int orcStripeSize) {
       storageConfig.setValue(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize));
       return this;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
index 74dfcab40eba..8f76c61fa724 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -28,6 +28,7 @@ import org.apache.hudi.storage.StoragePath;
 import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.lance.file.LanceFileWriter;
@@ -63,6 +64,7 @@ public abstract class HoodieBaseLanceWriter<R, K extends 
Comparable<K>> implemen
   private final int batchSize;
   @Getter(value = AccessLevel.PROTECTED)
   private long writtenRecordCount = 0;
+  private long totalFlushedDataSize = 0;
   private int currentBatchSize = 0;
   private VectorSchemaRoot root;
   private ArrowWriter<R> arrowWriter;
@@ -214,6 +216,21 @@ public abstract class HoodieBaseLanceWriter<R, K extends 
Comparable<K>> implemen
     }
   }
 
+  /**
+   * Returns the estimated data size in bytes, including both flushed batches 
and
+   * the current in-progress batch. The in-progress batch size is derived from
+   * Arrow buffer capacities, which may slightly overestimate due to 
pre-allocation.
+   */
+  protected long getDataSize() {
+    long currentBufferSize = 0;
+    if (root != null && currentBatchSize > 0) {
+      for (FieldVector vector : root.getFieldVectors()) {
+        currentBufferSize += vector.getBufferSize();
+      }
+    }
+    return totalFlushedDataSize + currentBufferSize;
+  }
+
   /**
    * Flush buffered records to Lance file.
    */
@@ -225,6 +242,11 @@ public abstract class HoodieBaseLanceWriter<R, K extends 
Comparable<K>> implemen
     // Finalize the arrow writer (sets row count on VectorSchemaRoot)
     arrowWriter.finishBatch();
 
+    // Accumulate the uncompressed Arrow buffer sizes for this batch
+    for (FieldVector vector : root.getFieldVectors()) {
+      totalFlushedDataSize += vector.getBufferSize();
+    }
+
     // Write VectorSchemaRoot to Lance file
     writer.write(root);
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
index 7d43430753f6..b2593fc90816 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
@@ -305,8 +305,9 @@ public class TestHoodieSparkLanceReader {
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_large.lance");
     int recordCount = 2500;
     BloomFilter dynamicBloomFilter = 
BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, DYNAMIC_V0.name());
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(dynamicBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(dynamicBloomFilter)).build()) {
       for (int i = 0; i < recordCount; i++) {
         GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long) 
i * 2});
         writer.writeRow("key" + i, row);
@@ -577,8 +578,9 @@ public class TestHoodieSparkLanceReader {
   }
     
   private HoodieSparkLanceReader writeAndCreateReader(StoragePath path, 
StructType schema, List<InternalRow> rows, boolean populateMetaFields) throws 
IOException {
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, 
populateMetaFields, Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).populateMetaFields(populateMetaFields).bloomFilterOpt(Option.of(simpleBloomFilter)).build())
 {
       for (int i = 0; i < rows.size(); i++) {
         HoodieKey key = new HoodieKey("key" + i, "default_partition");
         // Note writeRowWithMetadata implicitly handles case where 
populateMetaFields=false
@@ -604,8 +606,9 @@ public class TestHoodieSparkLanceReader {
     // Write Lance file with full schema
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_projection.lance");
     BloomFilter dynamicBloom = BloomFilterFactory.createBloomFilter(1000, 
0.0001, 10000, DYNAMIC_V0.name());
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, fullSchema, instantTime, taskContextSupplier, storage, false, 
Option.of(dynamicBloom))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(fullSchema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        .storage(storage).bloomFilterOpt(Option.of(dynamicBloom)).build()) {
       for (int i = 0; i < rows.size(); i++) {
         writer.writeRow("key" + i, rows.get(i));
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
index c1d907fce01b..4664e383b539 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -49,6 +49,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.lance.file.LanceFileReader;
 
 import java.io.File;
@@ -67,6 +69,7 @@ import static 
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -104,8 +107,9 @@ public class TestHoodieSparkLanceWriter {
     StructType schema = createSchemaWithMetaFields();
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_with_metadata.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, true, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).populateMetaFields(true).bloomFilterOpt(Option.of(simpleBloomFilter)).build())
 {
       // Write multiple records to test metadata population and sequence ID 
generation
       for (int i = 0; i < 3; i++) {
         InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
@@ -173,8 +177,9 @@ public class TestHoodieSparkLanceWriter {
     StructType schema = createSchemaWithoutMetaFields();
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_without_metadata.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       // Create row with just user data (no meta fields)
       InternalRow row = createRow(1, "Bob", 25L);
       HoodieKey key = new HoodieKey("key2", "partition2");
@@ -217,8 +222,9 @@ public class TestHoodieSparkLanceWriter {
     StructType schema = createSchemaWithoutMetaFields();
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_simple_write.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       InternalRow row = createRow(1, "Charlie", 35L);
       writer.writeRow("key3", row);
     }
@@ -238,8 +244,9 @@ public class TestHoodieSparkLanceWriter {
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_batch_flush.lance");
     // Write more than DEFAULT_BATCH_SIZE (1000) records
     int recordCount = 2500;
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       for (int i = 0; i < recordCount; i++) {
         InternalRow row = createRow(i, "User" + i, 20L + i);
         writer.writeRow("key" + i, row);
@@ -266,8 +273,9 @@ public class TestHoodieSparkLanceWriter {
         .add("binary_field", DataTypes.BinaryType, false);
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_primitives.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       GenericInternalRow row = new GenericInternalRow(new Object[]{
           42,                                    // int
           123456789L,                           // long
@@ -313,8 +321,9 @@ public class TestHoodieSparkLanceWriter {
         .add("age", DataTypes.LongType, true);
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_nulls.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       // Write rows with null values
       writer.writeRow("key1", createRow(1, "Alice", 30L));
       writer.writeRow("key2", createRow(2, null, 25L));  // null name
@@ -350,8 +359,9 @@ public class TestHoodieSparkLanceWriter {
         .add("id", DataTypes.IntegerType, false);
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_empty.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       // Close without writing any rows
     }
 
@@ -398,8 +408,9 @@ public class TestHoodieSparkLanceWriter {
     rows.add(new GenericInternalRow(new Object[]{2, 
UTF8String.fromString("Bob"), address2}));
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_struct.lance");
-    try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
       for (int i = 0; i < rows.size(); i++) {
         writer.writeRow("key" + i, rows.get(i));
       }
@@ -414,6 +425,116 @@ public class TestHoodieSparkLanceWriter {
     }
   }
 
+  /**
+   * HoodieSparkLanceWriter constructor must throw IllegalArgumentException if 
maxFileSize <= 0
+   */
+  @Test
+  public void testMaxFileSizeValidation() {
+    StructType schema = createSchemaWithoutMetaFields();
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_validation.lance");
+    IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
+        () -> HoodieSparkLanceWriter.builder()
+            
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+            .storage(storage).maxFileSize(0).build());
+    assertEquals("maxFileSize must be a positive number", ex.getMessage());
+  }
+
+  /**
+   * canWrite() must return true before any records are written (written count 
is 0,
+   * below the initial check threshold of MIN_RECORDS_FOR_SIZE_CHECK=100).
+   */
+  @Test
+  public void testCanWriteTrueBeforeAnyWrite() throws Exception {
+    StructType schema = createSchemaWithoutMetaFields();
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_canwrite_initial.lance");
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        .storage(storage).maxFileSize(Long.MAX_VALUE).build()) {
+      assertTrue(writer.canWrite(), "canWrite() must return true before any 
records are written");
+    }
+  }
+
+  /**
+   * canWrite() must always return true when maxFileSize is Long.MAX_VALUE,
+   * regardless of how many records are written and how many batch flushes 
occur.
+   */
+  @Test
+  public void testCanWriteAlwaysTrueWithNoLimit() throws Exception {
+    StructType schema = createSchemaWithoutMetaFields();
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_canwrite_no_limit.lance");
+    // Write more than DEFAULT_BATCH_SIZE (1000) to trigger at least one batch 
flush
+    int recordCount = 1500;
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        .storage(storage).maxFileSize(Long.MAX_VALUE).build()) {
+      for (int i = 0; i < recordCount; i++) {
+        assertTrue(writer.canWrite(), "canWrite() should always be true when 
maxFileSize is unlimited");
+        writer.writeRow("key" + i, createRow(i, "User" + i, 20L + i));
+      }
+    }
+  }
+
+  /**
+   * canWrite() must return false once the cumulative Arrow buffer size of 
flushed batches
+   * exceeds maxFileSize. A tiny maxFileSize (100 bytes) is used so that a 
single flushed
+   * batch of 1000 records (≫ 100 bytes) is guaranteed to exceed the limit.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testCanWriteReturnsFalseAfterFileSizeLimitExceeded(boolean 
populateMetaFields) throws Exception {
+    StructType schema = populateMetaFields ? createSchemaWithMetaFields() : 
createSchemaWithoutMetaFields();
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_canwrite_exceeded.lance");
+    // 100 bytes is far smaller than a single flushed batch of 1000 records
+    long tinyMaxFileSize = 100L;
+
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        
.storage(storage).populateMetaFields(populateMetaFields).maxFileSize(tinyMaxFileSize).build())
 {
+
+      assertTrue(writer.canWrite(), "canWrite() must be true before writing 
any records");
+
+      // Write exactly DEFAULT_BATCH_SIZE records to force the first batch 
flush
+      for (int i = 0; i < 1000; i++) {
+        if (populateMetaFields) {
+          HoodieKey key = new HoodieKey("key" + i, "partition1");
+          writer.writeRowWithMetadata(key, createRowWithMetaFields(i, "User" + 
i, 20L + i));
+        } else {
+          writer.writeRow("key" + i, createRow(i, "User" + i, 20L + i));
+        }
+      }
+
+      // After the flush the accumulated data size >> tinyMaxFileSize, so 
canWrite() must be false
+      assertFalse(writer.canWrite(),
+          "canWrite() must return false after flushed data size exceeds 
maxFileSize");
+    }
+  }
+
+  /**
+   * A write loop that respects canWrite() must stop well before the 
artificial upper bound
+   * when maxFileSize is tiny, demonstrating that canWrite() correctly gates 
record writes.
+   */
+  @Test
+  public void testCanWriteStopsWriteLoop() throws Exception {
+    StructType schema = createSchemaWithoutMetaFields();
+    StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_canwrite_loop.lance");
+    long tinyMaxFileSize = 100L;
+    int maxAllowed = 5000;
+    int writtenCount = 0;
+
+    try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+        
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+        .storage(storage).maxFileSize(tinyMaxFileSize).build()) {
+      while (writer.canWrite() && writtenCount < maxAllowed) {
+        writer.writeRow("key" + writtenCount, createRow(writtenCount, "User" + 
writtenCount, 20L));
+        writtenCount++;
+      }
+    }
+
+    assertTrue(writtenCount > 0, "At least one record should have been 
written");
+    assertTrue(writtenCount < maxAllowed,
+        "Write loop must have been stopped by canWrite() before reaching the 
artificial ceiling");
+  }
+
   // Helper methods
 
   private StructType createSchemaWithMetaFields() {

Reply via email to