This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2a5244814b2d feat: Support COW bulk-insert, insert, upsert, delete
works with spark datasource and lance (#17731)
2a5244814b2d is described below
commit 2a5244814b2da5a201b16dd889561a8b0dbf8005
Author: Rahil C <[email protected]>
AuthorDate: Wed Dec 31 10:00:01 2025 -0500
feat: Support COW bulk-insert, insert, upsert, delete works with spark
datasource and lance (#17731)
* feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter
* feat: Add HoodieSparkLanceReader for reading lance files to internal row
* migrate to hoodie schema and address tim prev comments
* Implement SparkColumnarFileReader for Datasource integration with Lance
* fix usages to hoodie schema
* fix iterator for reuse
* minor fixes
* add DisabledIfSystemProperty
* try spark 4.0
* scala style plugin property change
* intial minor comments
* add spark 3.4
* address comments
* address ethan tim comments
* scalastyle
* Support COW bulk-insert, insert, upsert, delete works with spark
datasource and lance
* get tests passing
* remove HoodieInternalRowLanceWriter
* fix compilation, address comments
* address copy()
* address tim comments
* remove older comment
* retrigger ci
* address tim comment
---
.../client/common/SparkReaderContextFactory.java | 3 +
.../io/storage/HoodieSparkFileReaderFactory.java | 5 +
.../io/storage/HoodieSparkFileWriterFactory.java | 1 -
.../hudi/io/storage/HoodieSparkLanceReader.java | 5 -
.../hudi/io/storage/HoodieSparkLanceWriter.java | 35 +-
.../hudi/io/storage/LanceRecordIterator.java | 2 +-
.../row/HoodieInternalRowFileWriterFactory.java | 16 +
.../org/apache/hudi/common/util/LanceUtils.java | 204 +++++++
.../hudi/io/storage/HoodieFileReaderFactory.java | 12 +
.../apache/hudi/io/storage/HoodieIOFactory.java | 2 +
.../io/storage/hadoop/HoodieHadoopIOFactory.java | 3 +
.../hudi/functional/TestLanceDataSource.scala | 620 ++++++++++++++++++++-
12 files changed, 897 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index d2f5eb81458f..a4e88b06915f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -100,6 +100,9 @@ public class SparkReaderContextFactory implements
ReaderContextFactory<InternalR
} else if (metaClient.getTableConfig().getBaseFileFormat() ==
HoodieFileFormat.ORC) {
SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver,
sqlConf, options, configs, sparkAdapter);
baseFileReaderBroadcast = jsc.broadcast(orcFileReader);
+ } else if (metaClient.getTableConfig().getBaseFileFormat() ==
HoodieFileFormat.LANCE) {
+ baseFileReaderBroadcast = jsc.broadcast(
+ sparkAdapter.createLanceFileReader(false, sqlConf, options,
configs));
} else {
baseFileReaderBroadcast = jsc.broadcast(
sparkAdapter.createParquetFileReader(false, sqlConf, options,
configs));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
index 1a68c3d0edbc..ec71460ea313 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
@@ -48,6 +48,11 @@ public class HoodieSparkFileReaderFactory extends
HoodieFileReaderFactory {
return new HoodieSparkParquetReader(storage, path);
}
+ @Override
+ protected HoodieFileReader newLanceFileReader(StoragePath path) {
+ return new HoodieSparkLanceReader(path);
+ }
+
@Override
protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig,
StoragePath path,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index c251eae33e2e..bd4ba955d04a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -57,7 +57,6 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
if (compressionCodecName.isEmpty()) {
compressionCodecName = null;
}
- //TODO boundary to revisit in follow up to use HoodieSchema directly
HoodieRowParquetWriteSupport writeSupport =
getHoodieRowParquetWriteSupport(storage.getConf(), schema,
config, enableBloomFilter(populateMetaFields, config));
HoodieRowParquetConfig parquetConfig = new
HoodieRowParquetConfig(writeSupport,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index 90e4996e44a4..b7794f3aef9f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -184,9 +184,4 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
throw new HoodieException("Failed to get row count from Lance file: " +
path, e);
}
}
-
- /**
- * Iterator implementation that reads Lance file batches and converts to
UnsafeRows.
- * Keeps ColumnarBatch alive while iterating to avoid unnecessary data
copying.
- */
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index e7476f4473fa..67e07d162ec1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -42,14 +43,15 @@ import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PART
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
/**
- * Spark Lance file writer implementing {@link HoodieSparkFileWriter}.
+ * Spark Lance file writer implementing {@link HoodieSparkFileWriter} and
{@link HoodieInternalRowFileWriter}.
*
* This writer integrates with Hudi's storage I/O layer and supports:
* - Hudi metadata field population
* - Record key tracking (for bloom filters - TODO
https://github.com/apache/hudi/issues/17664)
* - Sequence ID generation
*/
-public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
implements HoodieSparkFileWriter {
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
+ implements HoodieSparkFileWriter, HoodieInternalRowFileWriter {
private static final String DEFAULT_TIMEZONE = "UTC";
@@ -90,6 +92,22 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow> i
};
}
+ /**
+ * Constructor for Spark Lance writer used for internal row writing with
pre-embedded metadata.
+ *
+ * @param file Path where Lance file will be written
+ * @param sparkSchema Spark schema for the data
+ * @param taskContextSupplier Task context supplier for partition ID
+ * @param storage HoodieStorage instance
+ * @throws IOException if writer initialization fails
+ */
+ public HoodieSparkLanceWriter(StoragePath file,
+ StructType sparkSchema,
+ TaskContextSupplier taskContextSupplier,
+ HoodieStorage storage) throws IOException {
+ this(file, sparkSchema, null, taskContextSupplier, storage, false);
+ }
+
@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
if (populateMetaFields) {
@@ -105,6 +123,17 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow> i
public void writeRow(String recordKey, InternalRow row) throws IOException {
super.write(row);
}
+
+ @Override
+ public void writeRow(UTF8String key, InternalRow row) throws IOException {
+ // Key reserved for future bloom filter support
(https://github.com/apache/hudi/issues/17664)
+ super.write(row);
+ }
+
+ @Override
+ public void writeRow(InternalRow row) throws IOException {
+ super.write(row);
+ }
@Override
protected void populateVectorSchemaRoot(List<InternalRow> records) {
@@ -154,4 +183,4 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow> i
row.update(PARTITION_PATH_METADATA_FIELD.ordinal(),
UTF8String.fromString(partitionPath));
row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
index 88d5645879a6..d27668eb2b81 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
@@ -129,7 +129,7 @@ public class LanceRecordIterator implements
ClosableIterator<UnsafeRow> {
}
InternalRow row = rowIterator.next();
// Convert to UnsafeRow immediately while batch is still open
- return projection.apply(row);
+ return projection.apply(row).copy();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index d51b48f3cd55..b1431ae70d80 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -21,9 +21,11 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.config.HoodieParquetConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.storage.HoodieSparkLanceWriter;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;
@@ -33,6 +35,7 @@ import org.apache.spark.sql.types.StructType;
import java.io.IOException;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
@@ -59,6 +62,8 @@ public class HoodieInternalRowFileWriterFactory {
final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig,
schema, tryInstantiateBloomFilter(writeConfig));
+ } else if (LANCE.getFileExtension().equals(extension)) {
+ return newLanceInternalRowFileWriter(path, hoodieTable, schema);
}
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
@@ -87,6 +92,17 @@ public class HoodieInternalRowFileWriterFactory {
));
}
+ private static HoodieInternalRowFileWriter
newLanceInternalRowFileWriter(StoragePath path,
+
HoodieTable table,
+
StructType structType)
+ throws IOException {
+ return new HoodieSparkLanceWriter(
+ path,
+ structType,
+ new LocalTaskContextSupplier(),
+ table.getStorage());
+ }
+
private static Option<BloomFilter>
tryInstantiateBloomFilter(HoodieWriteConfig writeConfig) {
// NOTE: Currently Bloom Filter is only going to be populated if
meta-fields are populated
if (writeConfig.populateMetaFields()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
new file mode 100644
index 000000000000..c0bf94f07489
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LanceUtils extends FileFormatUtils {
+
+ @Override
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+ return fetchRecordKeysWithPositions(storage, filePath, Option.empty(),
Option.empty());
+ }
+
+ @Override
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage,
+
StoragePath filePath,
+
Option<BaseKeyGenerator> keyGeneratorOpt,
+
Option<String> partitionPath) {
+ AtomicLong position = new AtomicLong(0);
+ return new CloseableMappingIterator<>(
+ getHoodieKeyIterator(storage, filePath, keyGeneratorOpt,
partitionPath),
+ key -> Pair.of(key, position.getAndIncrement()));
+ }
+
+ @Override
+ public ClosableIterator<HoodieKey> getHoodieKeyIterator(HoodieStorage
storage, StoragePath filePath) {
+ return getHoodieKeyIterator(storage, filePath, Option.empty(),
Option.empty());
+ }
+
+ @Override
+ public ClosableIterator<HoodieKey> getHoodieKeyIterator(HoodieStorage
storage,
+ StoragePath
filePath,
+
Option<BaseKeyGenerator> keyGeneratorOpt,
+ Option<String>
partitionPath) {
+ try {
+ HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+ .getFileReader(new HoodieReaderConfig(), filePath,
HoodieFileFormat.LANCE);
+ ClosableIterator<String> keyIterator = reader.getRecordKeyIterator();
+ return new ClosableIterator<HoodieKey>() {
+ @Override
+ public void close() {
+ keyIterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keyIterator.hasNext();
+ }
+
+ @Override
+ public HoodieKey next() {
+ String key = keyIterator.next();
+ return new HoodieKey(key, partitionPath.orElse(null));
+ }
+ };
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read from Lance file" + filePath,
e);
+ }
+ }
+
+ @Override
+ public Schema readAvroSchema(HoodieStorage storage, StoragePath filePath) {
+ try (HoodieFileReader fileReader =
+ HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+ .getFileReader(
+ ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+ filePath,
+ HoodieFileFormat.LANCE)) {
+ return fileReader.getSchema().getAvroSchema();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read schema from Lance file", e);
+ }
+ }
+
+ @Override
+ public HoodieFileFormat getFormat() {
+ return HoodieFileFormat.LANCE;
+ }
+
+ @Override
+ public List<GenericRecord> readAvroRecords(HoodieStorage storage,
StoragePath filePath) {
+ throw new UnsupportedOperationException("readAvroRecords is not yet
supported for Lance format");
+ }
+
+ @Override
+ public List<GenericRecord> readAvroRecords(HoodieStorage storage,
StoragePath filePath, Schema schema) {
+ throw new UnsupportedOperationException("readAvroRecords with schema is
not yet supported for Lance format");
+ }
+
+ @Override
+ public Map<String, String> readFooter(HoodieStorage storage, boolean
required, StoragePath filePath, String... footerNames) {
+ throw new UnsupportedOperationException("readFooter is not yet supported
for Lance format");
+ }
+
+ @Override
+ public long getRowCount(HoodieStorage storage, StoragePath filePath) {
+ try (HoodieFileReader fileReader =
+ HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+ .getFileReader(
+ ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+ filePath,
+ HoodieFileFormat.LANCE)) {
+ return fileReader.getTotalRecords();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read schema from Lance file", e);
+ }
+ }
+
+ @Override
+ public Set<Pair<String, Long>> filterRowKeys(HoodieStorage storage,
StoragePath filePath, Set<String> filterKeys) {
+ try (HoodieFileReader fileReader =
+ HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
+ .getFileReader(
+ ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+ filePath,
+ HoodieFileFormat.LANCE)) {
+ return fileReader.filterRowKeys(filterKeys);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read filter keys from Lance
file", e);
+ }
+ }
+
+ @Override
+ public List<HoodieColumnRangeMetadata<Comparable>>
readColumnStatsFromMetadata(HoodieStorage storage,
+
StoragePath filePath,
+
List<String> columnList,
+
HoodieIndexVersion indexVersion) {
+ throw new UnsupportedOperationException("readColumnStatsFromMetadata is
not yet supported for Lance format");
+ }
+
+ @Override
+ public void writeMetaFile(HoodieStorage storage, StoragePath filePath,
Properties props) throws IOException {
+ throw new UnsupportedOperationException("writeMetaFile is not yet
supported for Lance format");
+ }
+
+ @Override
+ public ByteArrayOutputStream serializeRecordsToLogBlock(HoodieStorage
storage,
+ List<HoodieRecord>
records,
+ HoodieSchema
writerSchema,
+ HoodieSchema
readerSchema,
+ String keyFieldName,
+ Map<String, String>
paramsMap) throws IOException {
+ throw new UnsupportedOperationException("serializeRecordsToLogBlock is not
yet supported for Lance format");
+ }
+
+ @Override
+ public Pair<ByteArrayOutputStream, Object>
serializeRecordsToLogBlock(HoodieStorage storage,
+
Iterator<HoodieRecord> records,
+
HoodieRecord.HoodieRecordType recordType,
+ Schema
writerSchema,
+ Schema
readerSchema,
+ String
keyFieldName,
+
Map<String, String> paramsMap) throws IOException {
+ throw new UnsupportedOperationException("serializeRecordsToLogBlock with
iterator is not yet supported for Lance format");
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 341f20d7d700..4f0d1d2adf16 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.hudi.storage.StoragePathInfo;
import java.io.IOException;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
@@ -54,6 +55,9 @@ public class HoodieFileReaderFactory {
if (ORC.getFileExtension().equals(extension)) {
return getFileReader(hoodieConfig, path, ORC, Option.empty());
}
+ if (LANCE.getFileExtension().equals(extension)) {
+ return getFileReader(hoodieConfig, path, LANCE, Option.empty());
+ }
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
@@ -71,6 +75,8 @@ public class HoodieFileReaderFactory {
return newHFileFileReader(hoodieConfig, path, schemaOption);
case ORC:
return newOrcFileReader(path);
+ case LANCE:
+ return newLanceFileReader(path);
default:
throw new UnsupportedOperationException(format + " format not
supported yet.");
}
@@ -85,6 +91,8 @@ public class HoodieFileReaderFactory {
return newHFileFileReader(hoodieConfig, pathInfo, schemaOption);
case ORC:
return newOrcFileReader(pathInfo.getPath());
+ case LANCE:
+ return newLanceFileReader(pathInfo.getPath());
default:
throw new UnsupportedOperationException(format + " format not
supported yet.");
}
@@ -125,6 +133,10 @@ public class HoodieFileReaderFactory {
throw new UnsupportedOperationException();
}
+ protected HoodieFileReader newLanceFileReader(StoragePath path) {
+ throw new UnsupportedOperationException();
+ }
+
public HoodieFileReader newBootstrapFileReader(HoodieFileReader
skeletonFileReader,
HoodieFileReader
dataFileReader,
Option<String[]>
partitionFields,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
index e1cff2a0424e..d21151bf639c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -112,6 +112,8 @@ public abstract class HoodieIOFactory {
return getFileFormatUtils(HoodieFileFormat.ORC);
} else if
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
return getFileFormatUtils(HoodieFileFormat.HFILE);
+ } else if
(path.getFileExtension().equals(HoodieFileFormat.LANCE.getFileExtension())) {
+ return getFileFormatUtils(HoodieFileFormat.LANCE);
}
throw new UnsupportedOperationException("The format for file " + path + "
is not supported yet.");
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
index 3d8e797682e9..6cf94f16fe14 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieHadoopIOFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.HFileUtils;
+import org.apache.hudi.common.util.LanceUtils;
import org.apache.hudi.common.util.OrcUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -111,6 +112,8 @@ public class HoodieHadoopIOFactory extends HoodieIOFactory {
return new OrcUtils();
case HFILE:
return new HFileUtils();
+ case LANCE:
+ return new LanceUtils();
default:
throw new UnsupportedOperationException(fileFormat.name() + " format
not supported yet.");
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 32e7d869c6e4..9a634b8eec6d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -19,7 +19,8 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DefaultSparkRecordMerger
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieSparkClientTestBase
@@ -28,6 +29,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+import scala.collection.JavaConverters._
+
/**
* Basic functional tests for Lance file format with Hudi Spark datasource.
*/
@@ -129,4 +132,619 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
}
+
+ @Test
+ def testWhereClauseFiltering(): Unit = {
+ val tableName = "test_lance_where"
+ val tablePath = s"$basePath/$tableName"
+
+ // Create test data
+ val records = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4)
+ )
+ val df = spark.createDataFrame(records).toDF("id", "name", "age", "score")
+
+ // Write to Hudi table with Lance format
+ df.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Test 1: Simple WHERE clause on numeric column
+ val filteredByAge = spark.read
+ .format("hudi")
+ .load(tablePath)
+ .where("age > 30")
+ .select("id", "name", "age", "score")
+
+ val expectedFilteredByAge = spark.createDataFrame(Seq(
+ (3, "Charlie", 35, 92.1),
+ (5, "Eve", 32, 91.4)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedFilteredByAge.except(filteredByAge).isEmpty)
+ assertTrue(filteredByAge.except(expectedFilteredByAge).isEmpty)
+
+ // Test 2: WHERE clause on string column
+ val filteredByName = spark.read
+ .format("hudi")
+ .load(tablePath)
+ .where("name = 'Bob'")
+ .select("id", "name", "age", "score")
+
+ val expectedFilteredByName = spark.createDataFrame(Seq(
+ (2, "Bob", 25, 87.3)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedFilteredByName.except(filteredByName).isEmpty)
+ assertTrue(filteredByName.except(expectedFilteredByName).isEmpty)
+
+ // Test 3: Complex WHERE with multiple conditions
+ val filteredComplex = spark.read
+ .format("hudi")
+ .load(tablePath)
+ .where("age >= 28 AND score > 90")
+ .select("id", "name", "age", "score")
+
+ val expectedFilteredComplex = spark.createDataFrame(Seq(
+ (1, "Alice", 30, 95.5),
+ (3, "Charlie", 35, 92.1),
+ (5, "Eve", 32, 91.4)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedFilteredComplex.except(filteredComplex).isEmpty)
+ assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty)
+ }
+
+ @Test
+ def testMultipleBulkInsertsWithCommitValidation(): Unit = {
+ val tableName = "test_lance_multiple_inserts"
+ val tablePath = s"$basePath/$tableName"
+
+ // First insert - records 1-3
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1)
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+
+ df1.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Second insert - records 4-6
+ val records2 = Seq(
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7)
+ )
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+
+ df2.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Third insert - records 7-9
+ val records3 = Seq(
+ (7, "Grace", 29, 93.2),
+ (8, "Henry", 31, 89.6),
+ (9, "Iris", 26, 94.8)
+ )
+ val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+
+ df3.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Validate number of commits matches number of inserts
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
+ val commitCount =
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+ assertEquals(3, commitCount, "Should have 3 completed commits (one per
insert)")
+
+ // Verify that all commits are bulk_insert commits
+ val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+ assertEquals(3, commits.size, "Should have exactly 3 commits")
+
+ // Check that each commit is a COMMIT action (bulk_insert creates COMMIT
actions)
+ commits.foreach { instant =>
+ assertEquals("commit", instant.getAction, s"Instant
${instant.requestedTime()} should be a commit action")
+ }
+
+ // Read back all data and verify total record count
+ val readDf = spark.read
+ .format("hudi")
+ .load(tablePath)
+
+ val actual = readDf.select("id", "name", "age", "score")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7),
+ (7, "Grace", 29, 93.2),
+ (8, "Henry", 31, 89.6),
+ (9, "Iris", 26, 94.8)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ @Test
+ def testTimeTravel(): Unit = {
+ val tableName = "test_lance_time_travel"
+ val tablePath = s"$basePath/$tableName"
+
+ // First insert - records 1-3
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1)
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+
+ df1.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Second insert - records 4-6
+ val records2 = Seq(
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7)
+ )
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+
+ df2.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Get the commit timestamp after second insert
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
+ val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+ assertEquals(2, commits.size, "Should have 2 commits after second insert")
+ val secondCommitTime = commits(1).requestedTime()
+
+ // Third insert - records 7-9
+ val records3 = Seq(
+ (7, "Grace", 29, 93.2),
+ (8, "Henry", 31, 89.6),
+ (9, "Iris", 26, 94.8)
+ )
+ val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+
+ df3.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Time travel query to second commit (should see data from c1 + c2 only)
+ val timeTravelDf = spark.read
+ .format("hudi")
+ .option("as.of.instant", secondCommitTime)
+ .load(tablePath)
+
+ val actual = timeTravelDf.select("id", "name", "age", "score")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ @Test
+ def testMultipleRegularInsertsWithCommitValidation(): Unit = {
+ val tableName = "test_lance_regular_inserts"
+ val tablePath = s"$basePath/$tableName"
+
+ // First insert - records 1-3 using regular insert
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1)
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+
+ df1.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "insert")
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Second insert - records 4-6 using regular insert
+ val records2 = Seq(
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7)
+ )
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+
+ df2.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Validate number of commits matches number of inserts
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
+ val commitCount =
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+ assertEquals(2, commitCount, "Should have 2 completed commits (one per
insert)")
+
+ // Verify that all commits are insert commits
+ val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+ assertEquals(2, commits.size, "Should have exactly 2 commits")
+
+ // Check that each commit is a COMMIT action (insert creates COMMIT
actions)
+ commits.foreach { instant =>
+ assertEquals("commit", instant.getAction, s"Instant
${instant.requestedTime()} should be a commit action")
+ }
+
+ // Read back all data and verify total record count
+ val readDf = spark.read
+ .format("hudi")
+ .load(tablePath)
+
+ val actual = readDf.select("id", "name", "age", "score")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ @Test
+ def testBasicUpsertModifyExistingRow(): Unit = {
+ val tableName = "test_lance_upsert"
+ val tablePath = s"$basePath/$tableName"
+
+ // Initial insert - 3 records
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1)
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+
+ df1.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "insert")
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Upsert - modify Bob's record (id=2)
+ val records2 = Seq(
+ (2, "Bob", 40, 95.0) // Update Bob: age 25->40, score 87.3->95.0
+ )
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+
+ df2.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "upsert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Second upsert - modify Alice (id=1) and insert David (id=4)
+ val records3 = Seq(
+ (1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5
+ (4, "David", 28, 88.0) // Insert new record
+ )
+ val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+
+ df3.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "upsert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Validate commits
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
+ val commitCount =
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+ assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2
upserts)")
+
+ // Read and verify data
+ val readDf = spark.read.format("hudi").load(tablePath)
+ val actual = readDf.select("id", "name", "age", "score")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice", 45, 98.5),
+ (2, "Bob", 40, 95.0),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.0)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ @Test
+ def testBasicDeleteOperation(): Unit = {
+ val tableName = "test_lance_delete"
+ val tablePath = s"$basePath/$tableName"
+
+ // Initial insert - 5 records
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.0),
+ (5, "Eve", 32, 91.4)
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+
+ df1.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "insert")
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Delete operation - delete Bob (id=2), David (id=4), and a non-existent
key (id=99)
+ val recordsToDelete = Seq(
+ (2, "Bob", 25, 87.3), // Delete Bob (exists)
+ (4, "David", 28, 88.0), // Delete David (exists)
+ (99, "NonExistent", 50, 0.0) // Delete non-existent record (should be
no-op)
+ )
+ val deleteDF = spark.createDataFrame(recordsToDelete).toDF("id", "name",
"age", "score")
+
+ deleteDF.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "delete")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Validate commits
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
+ val commitCount =
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
+ assertEquals(2, commitCount, "Should have 2 completed commits (insert +
delete)")
+
+ // Read and verify data
+ val readDf = spark.read.format("hudi").load(tablePath)
+ val actual = readDf.select("id", "name", "age", "score")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice", 30, 95.5),
+ (3, "Charlie", 35, 92.1),
+ (5, "Eve", 32, 91.4)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ @Test
+ def testIncrementalQuery(): Unit = {
+ val tableName = "test_lance_incremental"
+ val tablePath = s"$basePath/$tableName"
+
+ // First insert - records 1-3
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1)
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+
+ df1.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Second insert - records 4-6
+ val records2 = Seq(
+ (4, "David", 28, 88.9),
+ (5, "Eve", 32, 91.4),
+ (6, "Frank", 27, 85.7)
+ )
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+
+ df2.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Get commit timestamps
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
+ val commitsAfterSecond =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+ assertEquals(2, commitsAfterSecond.size, "Should have 2 commits after
second insert")
+ val secondCommitTime = commitsAfterSecond(1).getCompletionTime
+
+ // Third insert - records 7-9
+ val records3 = Seq(
+ (7, "Grace", 29, 93.2),
+ (8, "Henry", 31, 89.6),
+ (9, "Iris", 26, 94.8)
+ )
+ val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+
+ df3.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .option(OPERATION.key(), "bulk_insert")
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ // Reload metaClient to get latest commits
+ metaClient.reloadActiveTimeline()
+ val allCommits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
+ assertEquals(3, allCommits.size, "Should have 3 commits after third
insert")
+ val thirdCommitTime = allCommits(2).getCompletionTime
+
+ // Incremental query from c2 to c3 (should see only data from c3)
+ val incrementalDf = spark.read
+ .format("hudi")
+ .option("hoodie.datasource.query.type", "incremental")
+ .option("hoodie.datasource.read.begin.instanttime", secondCommitTime)
+ .option("hoodie.datasource.read.end.instanttime", thirdCommitTime)
+ .load(tablePath)
+
+ val actual = incrementalDf.select("id", "name", "age", "score")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (7, "Grace", 29, 93.2),
+ (8, "Henry", 31, 89.6),
+ (9, "Iris", 26, 94.8)
+ )).toDF("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
}