This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f6a33fd828d5 feat(lance): Implement canWrite() in
HoodieSparkLanceWriter with configurable max file size for Lance (#18341)
f6a33fd828d5 is described below
commit f6a33fd828d526811842d4ca25b0d227d8b1a21d
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Mar 31 16:25:54 2026 +0700
feat(lance): Implement canWrite() in HoodieSparkLanceWriter with
configurable max file size for Lance (#18341)
---
.../io/storage/HoodieSparkFileWriterFactory.java | 12 +-
.../hudi/io/storage/HoodieSparkLanceWriter.java | 104 +++++++++-----
.../row/HoodieInternalRowFileWriterFactory.java | 19 ++-
.../hudi/common/config/HoodieStorageConfig.java | 11 ++
.../hudi/io/lance/HoodieBaseLanceWriter.java | 22 +++
.../io/storage/TestHoodieSparkLanceReader.java | 15 +-
.../io/storage/TestHoodieSparkLanceWriter.java | 153 ++++++++++++++++++---
7 files changed, 272 insertions(+), 64 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 9ebb94a0bb54..bb075470c507 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -111,8 +111,18 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
boolean enableBloomFilter = enableBloomFilter(populateMetaFields, config);
Option<BloomFilter> bloomFilter = enableBloomFilter ?
Option.of(createBloomFilter(config)) : Option.empty();
+ long maxFileSize =
config.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE);
- return new HoodieSparkLanceWriter(path, structType, instantTime,
taskContextSupplier, storage, populateMetaFields, bloomFilter);
+ return HoodieSparkLanceWriter.builder()
+ .file(path)
+ .sparkSchema(structType)
+ .instantTime(instantTime)
+ .taskContextSupplier(taskContextSupplier)
+ .storage(storage)
+ .populateMetaFields(populateMetaFields)
+ .bloomFilterOpt(bloomFilter)
+ .maxFileSize(maxFileSize)
+ .build();
}
private static HoodieRowParquetWriteSupport
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema
schema,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index e029fcbab010..02f9c7e3b16b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,6 +32,7 @@ import org.apache.hudi.storage.StoragePath;
import com.lancedb.lance.spark.arrow.LanceArrowWriter;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -46,6 +48,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMM
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
/**
* Spark Lance file writer implementing {@link HoodieSparkFileWriter} and
{@link HoodieInternalRowFileWriter}.
@@ -60,6 +63,8 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow, U
implements HoodieSparkFileWriter, HoodieInternalRowFileWriter {
private static final String DEFAULT_TIMEZONE = "UTC";
+ private static final long MIN_RECORDS_FOR_SIZE_CHECK = 100L;
+ private static final long MAX_RECORDS_FOR_SIZE_CHECK = 10000L;
private final StructType sparkSchema;
private final Schema arrowSchema;
@@ -67,52 +72,66 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow, U
private final UTF8String instantTime;
private final boolean populateMetaFields;
private final Function<Long, String> seqIdGenerator;
+ private final long maxFileSize;
+ private long recordCountForNextSizeCheck = MIN_RECORDS_FOR_SIZE_CHECK;
/**
- * Constructor for Spark Lance writer.
+ * Creates a new builder for constructing {@link HoodieSparkLanceWriter}
instances.
*
- * @param file Path where Lance file will be written
- * @param sparkSchema Spark schema for the data
- * @param instantTime Instant time for the commit
- * @param taskContextSupplier Task context supplier for partition ID
- * @param storage HoodieStorage instance
- * @param populateMetaFields Whether to populate Hudi metadata fields
- * @param bloomFilterOpt Optional bloom filter for record key tracking
+ * <p>Required parameters: {@code file}, {@code sparkSchema}, {@code
taskContextSupplier}, {@code storage}.
+ * <p>Optional parameters with defaults:
+ * <ul>
+ * <li>{@code instantTime} — defaults to {@code null}</li>
+ * <li>{@code populateMetaFields} — defaults to {@code false}</li>
+ * <li>{@code bloomFilterOpt} — defaults to {@link Option#empty()}</li>
+ * <li>{@code maxFileSize} — defaults to {@link
HoodieStorageConfig#LANCE_MAX_FILE_SIZE}</li>
+ * </ul>
*/
- public HoodieSparkLanceWriter(StoragePath file,
- StructType sparkSchema,
- String instantTime,
- TaskContextSupplier taskContextSupplier,
- HoodieStorage storage,
- boolean populateMetaFields,
- Option<BloomFilter> bloomFilterOpt) {
+ @Builder(builderMethodName = "builder")
+ private static HoodieSparkLanceWriter create(
+ StoragePath file,
+ StructType sparkSchema,
+ String instantTime,
+ TaskContextSupplier taskContextSupplier,
+ HoodieStorage storage,
+ boolean populateMetaFields,
+ Option<BloomFilter> bloomFilterOpt,
+ long maxFileSize) {
+ checkArgument(maxFileSize > 0, "maxFileSize must be a positive number");
+ return new HoodieSparkLanceWriter(file, sparkSchema, instantTime,
+ taskContextSupplier, storage, populateMetaFields, bloomFilterOpt,
maxFileSize);
+ }
+
+ /**
+ * Manually declared builder class to provide default values for optional
parameters.
+ * Lombok fills in the remaining builder methods.
+ */
+ public static class HoodieSparkLanceWriterBuilder {
+ private Option<BloomFilter> bloomFilterOpt = Option.empty();
+ private long maxFileSize =
Long.parseLong(HoodieStorageConfig.LANCE_MAX_FILE_SIZE.defaultValue());
+ }
+
+ private HoodieSparkLanceWriter(StoragePath file,
+ StructType sparkSchema,
+ String instantTime,
+ TaskContextSupplier taskContextSupplier,
+ HoodieStorage storage,
+ boolean populateMetaFields,
+ Option<BloomFilter> bloomFilterOpt,
+ long maxFileSize) {
super(file, DEFAULT_BATCH_SIZE,
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
this.sparkSchema = sparkSchema;
this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema,
DEFAULT_TIMEZONE, true, false);
this.fileName = UTF8String.fromString(file.getName());
this.instantTime = UTF8String.fromString(instantTime);
this.populateMetaFields = populateMetaFields;
+ this.maxFileSize = maxFileSize;
this.seqIdGenerator = recordIndex -> {
Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
return HoodieRecord.generateSequenceId(instantTime, partitionId,
recordIndex);
};
}
- /**
- * Constructor for Spark Lance writer used for internal row writing with
pre-embedded metadata.
- *
- * @param file Path where Lance file will be written
- * @param sparkSchema Spark schema for the data
- * @param taskContextSupplier Task context supplier for partition ID
- * @param storage HoodieStorage instance
- */
- public HoodieSparkLanceWriter(StoragePath file,
- StructType sparkSchema,
- TaskContextSupplier taskContextSupplier,
- HoodieStorage storage) {
- this(file, sparkSchema, null, taskContextSupplier, storage, false,
Option.empty());
- }
-
@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
@@ -147,13 +166,30 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow, U
}
/**
- * Check if writer can accept more records based on file size.
- * Uses filesystem-based size checking (similar to ORC/HFile approach).
+ * Check if writer can accept more records based on estimated data size.
+ * Data size is approximated by accumulating Arrow buffer sizes across
flushed batches,
+ * analogous to {@code ParquetWriter.getDataSize()}.
+ * The check is performed periodically (not on every record) and the
interval adapts
+ * based on the observed average record size.
*
- * @return true if writer can accept more records, false if file size limit
reached
+ * @return true if writer can accept more records, false if file size limit
is reached
*/
public boolean canWrite() {
- //TODO https://github.com/apache/hudi/issues/17684
+ long writtenCount = getWrittenRecordCount();
+ if (writtenCount >= recordCountForNextSizeCheck) {
+ long dataSize = getDataSize();
+ // In extreme cases (e.g. all records same value, high compression
ratio),
+ // dataSize may be 0; force avgRecordSize to at least 1 to avoid
division by zero.
+ long avgRecordSize = Math.max(dataSize / writtenCount, 1);
+ // Return false when within ~2 records of the limit
+ if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+ return false;
+ }
+ recordCountForNextSizeCheck = writtenCount + Math.min(
+ // Check at halfway between current position and the limit
+ Math.max(MIN_RECORDS_FOR_SIZE_CHECK, (maxFileSize / avgRecordSize -
writtenCount) / 2),
+ MAX_RECORDS_FOR_SIZE_CHECK);
+ }
return true;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index b1431ae70d80..e1f1d66dff1a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.config.HoodieParquetConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
@@ -63,7 +64,8 @@ public class HoodieInternalRowFileWriterFactory {
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig,
schema, tryInstantiateBloomFilter(writeConfig));
} else if (LANCE.getFileExtension().equals(extension)) {
- return newLanceInternalRowFileWriter(path, hoodieTable, schema);
+ long maxFileSize =
writeConfig.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE);
+ return newLanceInternalRowFileWriter(path, hoodieTable, schema,
maxFileSize);
}
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
@@ -94,13 +96,16 @@ public class HoodieInternalRowFileWriterFactory {
private static HoodieInternalRowFileWriter
newLanceInternalRowFileWriter(StoragePath path,
HoodieTable table,
-
StructType structType)
+
StructType structType,
+
long maxFileSize)
throws IOException {
- return new HoodieSparkLanceWriter(
- path,
- structType,
- new LocalTaskContextSupplier(),
- table.getStorage());
+ return HoodieSparkLanceWriter.builder()
+ .file(path)
+ .sparkSchema(structType)
+ .taskContextSupplier(new LocalTaskContextSupplier())
+ .storage(table.getStorage())
+ .maxFileSize(maxFileSize)
+ .build();
}
private static Option<BloomFilter>
tryInstantiateBloomFilter(HoodieWriteConfig writeConfig) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 56ad4a0f92b4..5f155e57a1eb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -80,6 +80,12 @@ public class HoodieStorageConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Target file size in bytes for HFile base files.");
+ public static final ConfigProperty<String> LANCE_MAX_FILE_SIZE =
ConfigProperty
+ .key("hoodie.lance.max.file.size")
+ .defaultValue(String.valueOf(120 * 1024 * 1024))
+ .markAdvanced()
+ .withDocumentation("Target file size in bytes for Lance base files.");
+
public static final ConfigProperty<Boolean> HFILE_WRITER_TO_ALLOW_DUPLICATES
= ConfigProperty
.key("hoodie.hfile.writes.allow.duplicates")
.defaultValue(false)
@@ -507,6 +513,11 @@ public class HoodieStorageConfig extends HoodieConfig {
return this;
}
+ public Builder lanceMaxFileSize(long maxFileSize) {
+ storageConfig.setValue(LANCE_MAX_FILE_SIZE, String.valueOf(maxFileSize));
+ return this;
+ }
+
public Builder orcStripeSize(int orcStripeSize) {
storageConfig.setValue(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize));
return this;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
index 74dfcab40eba..8f76c61fa724 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -28,6 +28,7 @@ import org.apache.hudi.storage.StoragePath;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.lance.file.LanceFileWriter;
@@ -63,6 +64,7 @@ public abstract class HoodieBaseLanceWriter<R, K extends
Comparable<K>> implemen
private final int batchSize;
@Getter(value = AccessLevel.PROTECTED)
private long writtenRecordCount = 0;
+ private long totalFlushedDataSize = 0;
private int currentBatchSize = 0;
private VectorSchemaRoot root;
private ArrowWriter<R> arrowWriter;
@@ -214,6 +216,21 @@ public abstract class HoodieBaseLanceWriter<R, K extends
Comparable<K>> implemen
}
}
+ /**
+ * Returns the estimated data size in bytes, including both flushed batches
and
+ * the current in-progress batch. The in-progress batch size is derived from
+ * Arrow buffer capacities, which may slightly overestimate due to
pre-allocation.
+ */
+ protected long getDataSize() {
+ long currentBufferSize = 0;
+ if (root != null && currentBatchSize > 0) {
+ for (FieldVector vector : root.getFieldVectors()) {
+ currentBufferSize += vector.getBufferSize();
+ }
+ }
+ return totalFlushedDataSize + currentBufferSize;
+ }
+
/**
* Flush buffered records to Lance file.
*/
@@ -225,6 +242,11 @@ public abstract class HoodieBaseLanceWriter<R, K extends
Comparable<K>> implemen
// Finalize the arrow writer (sets row count on VectorSchemaRoot)
arrowWriter.finishBatch();
+ // Accumulate the uncompressed Arrow buffer sizes for this batch
+ for (FieldVector vector : root.getFieldVectors()) {
+ totalFlushedDataSize += vector.getBufferSize();
+ }
+
// Write VectorSchemaRoot to Lance file
writer.write(root);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
index 7d43430753f6..b2593fc90816 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
@@ -305,8 +305,9 @@ public class TestHoodieSparkLanceReader {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_large.lance");
int recordCount = 2500;
BloomFilter dynamicBloomFilter =
BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, DYNAMIC_V0.name());
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(dynamicBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(dynamicBloomFilter)).build()) {
for (int i = 0; i < recordCount; i++) {
GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long)
i * 2});
writer.writeRow("key" + i, row);
@@ -577,8 +578,9 @@ public class TestHoodieSparkLanceReader {
}
private HoodieSparkLanceReader writeAndCreateReader(StoragePath path,
StructType schema, List<InternalRow> rows, boolean populateMetaFields) throws
IOException {
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage,
populateMetaFields, Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).populateMetaFields(populateMetaFields).bloomFilterOpt(Option.of(simpleBloomFilter)).build())
{
for (int i = 0; i < rows.size(); i++) {
HoodieKey key = new HoodieKey("key" + i, "default_partition");
// Note writeRowWithMetadata implicitly handles case where
populateMetaFields=false
@@ -604,8 +606,9 @@ public class TestHoodieSparkLanceReader {
// Write Lance file with full schema
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_projection.lance");
BloomFilter dynamicBloom = BloomFilterFactory.createBloomFilter(1000,
0.0001, 10000, DYNAMIC_V0.name());
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, fullSchema, instantTime, taskContextSupplier, storage, false,
Option.of(dynamicBloom))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(fullSchema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+ .storage(storage).bloomFilterOpt(Option.of(dynamicBloom)).build()) {
for (int i = 0; i < rows.size(); i++) {
writer.writeRow("key" + i, rows.get(i));
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
index c1d907fce01b..4664e383b539 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -49,6 +49,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.lance.file.LanceFileReader;
import java.io.File;
@@ -67,6 +69,7 @@ import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -104,8 +107,9 @@ public class TestHoodieSparkLanceWriter {
StructType schema = createSchemaWithMetaFields();
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_with_metadata.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, true,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).populateMetaFields(true).bloomFilterOpt(Option.of(simpleBloomFilter)).build())
{
// Write multiple records to test metadata population and sequence ID
generation
for (int i = 0; i < 3; i++) {
InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
@@ -173,8 +177,9 @@ public class TestHoodieSparkLanceWriter {
StructType schema = createSchemaWithoutMetaFields();
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_without_metadata.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
// Create row with just user data (no meta fields)
InternalRow row = createRow(1, "Bob", 25L);
HoodieKey key = new HoodieKey("key2", "partition2");
@@ -217,8 +222,9 @@ public class TestHoodieSparkLanceWriter {
StructType schema = createSchemaWithoutMetaFields();
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_simple_write.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
InternalRow row = createRow(1, "Charlie", 35L);
writer.writeRow("key3", row);
}
@@ -238,8 +244,9 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_batch_flush.lance");
// Write more than DEFAULT_BATCH_SIZE (1000) records
int recordCount = 2500;
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
for (int i = 0; i < recordCount; i++) {
InternalRow row = createRow(i, "User" + i, 20L + i);
writer.writeRow("key" + i, row);
@@ -266,8 +273,9 @@ public class TestHoodieSparkLanceWriter {
.add("binary_field", DataTypes.BinaryType, false);
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
GenericInternalRow row = new GenericInternalRow(new Object[]{
42, // int
123456789L, // long
@@ -313,8 +321,9 @@ public class TestHoodieSparkLanceWriter {
.add("age", DataTypes.LongType, true);
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
// Write rows with null values
writer.writeRow("key1", createRow(1, "Alice", 30L));
writer.writeRow("key2", createRow(2, null, 25L)); // null name
@@ -350,8 +359,9 @@ public class TestHoodieSparkLanceWriter {
.add("id", DataTypes.IntegerType, false);
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_empty.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
// Close without writing any rows
}
@@ -398,8 +408,9 @@ public class TestHoodieSparkLanceWriter {
rows.add(new GenericInternalRow(new Object[]{2,
UTF8String.fromString("Bob"), address2}));
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_struct.lance");
- try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).bloomFilterOpt(Option.of(simpleBloomFilter)).build()) {
for (int i = 0; i < rows.size(); i++) {
writer.writeRow("key" + i, rows.get(i));
}
@@ -414,6 +425,116 @@ public class TestHoodieSparkLanceWriter {
}
}
+ /**
+ * HoodieSparkLanceWriter constructor must throw IllegalArgumentException if
maxFileSize <= 0
+ */
+ @Test
+ public void testMaxFileSizeValidation() {
+ StructType schema = createSchemaWithoutMetaFields();
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_validation.lance");
+ IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
+ () -> HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+ .storage(storage).maxFileSize(0).build());
+ assertEquals("maxFileSize must be a positive number", ex.getMessage());
+ }
+
+ /**
+ * canWrite() must return true before any records are written (written count
is 0,
+ * below the initial check threshold of MIN_RECORDS_FOR_SIZE_CHECK=100).
+ */
+ @Test
+ public void testCanWriteTrueBeforeAnyWrite() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_canwrite_initial.lance");
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+ .storage(storage).maxFileSize(Long.MAX_VALUE).build()) {
+ assertTrue(writer.canWrite(), "canWrite() must return true before any
records are written");
+ }
+ }
+
+ /**
+ * canWrite() must always return true when maxFileSize is Long.MAX_VALUE,
+ * regardless of how many records are written and how many batch flushes
occur.
+ */
+ @Test
+ public void testCanWriteAlwaysTrueWithNoLimit() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_canwrite_no_limit.lance");
+ // Write more than DEFAULT_BATCH_SIZE (1000) to trigger at least one batch
flush
+ int recordCount = 1500;
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+ .storage(storage).maxFileSize(Long.MAX_VALUE).build()) {
+ for (int i = 0; i < recordCount; i++) {
+ assertTrue(writer.canWrite(), "canWrite() should always be true when
maxFileSize is unlimited");
+ writer.writeRow("key" + i, createRow(i, "User" + i, 20L + i));
+ }
+ }
+ }
+
+ /**
+ * canWrite() must return false once the cumulative Arrow buffer size of
flushed batches
+ * exceeds maxFileSize. A tiny maxFileSize (100 bytes) is used so that a
single flushed
+ * batch of 1000 records (≫ 100 bytes) is guaranteed to exceed the limit.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCanWriteReturnsFalseAfterFileSizeLimitExceeded(boolean
populateMetaFields) throws Exception {
+ StructType schema = populateMetaFields ? createSchemaWithMetaFields() :
createSchemaWithoutMetaFields();
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_canwrite_exceeded.lance");
+ // 100 bytes is far smaller than a single flushed batch of 1000 records
+ long tinyMaxFileSize = 100L;
+
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+
.storage(storage).populateMetaFields(populateMetaFields).maxFileSize(tinyMaxFileSize).build())
{
+
+ assertTrue(writer.canWrite(), "canWrite() must be true before writing
any records");
+
+ // Write exactly DEFAULT_BATCH_SIZE records to force the first batch
flush
+ for (int i = 0; i < 1000; i++) {
+ if (populateMetaFields) {
+ HoodieKey key = new HoodieKey("key" + i, "partition1");
+ writer.writeRowWithMetadata(key, createRowWithMetaFields(i, "User" +
i, 20L + i));
+ } else {
+ writer.writeRow("key" + i, createRow(i, "User" + i, 20L + i));
+ }
+ }
+
+ // After the flush the accumulated data size >> tinyMaxFileSize, so
canWrite() must be false
+ assertFalse(writer.canWrite(),
+ "canWrite() must return false after flushed data size exceeds
maxFileSize");
+ }
+ }
+
+ /**
+ * A write loop that respects canWrite() must stop well before the
artificial upper bound
+ * when maxFileSize is tiny, demonstrating that canWrite() correctly gates
record writes.
+ */
+ @Test
+ public void testCanWriteStopsWriteLoop() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_canwrite_loop.lance");
+ long tinyMaxFileSize = 100L;
+ int maxAllowed = 5000;
+ int writtenCount = 0;
+
+ try (HoodieSparkLanceWriter writer = HoodieSparkLanceWriter.builder()
+
.file(path).sparkSchema(schema).instantTime(instantTime).taskContextSupplier(taskContextSupplier)
+ .storage(storage).maxFileSize(tinyMaxFileSize).build()) {
+ while (writer.canWrite() && writtenCount < maxAllowed) {
+ writer.writeRow("key" + writtenCount, createRow(writtenCount, "User" +
writtenCount, 20L));
+ writtenCount++;
+ }
+ }
+
+ assertTrue(writtenCount > 0, "At least one record should have been
written");
+ assertTrue(writtenCount < maxAllowed,
+ "Write loop must have been stopped by canWrite() before reaching the
artificial ceiling");
+ }
+
// Helper methods
private StructType createSchemaWithMetaFields() {