voonhous commented on code in PR #18304:
URL: https://github.com/apache/hudi/pull/18304#discussion_r2934753689
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java:
##########
@@ -62,6 +66,7 @@ public abstract class HoodieBaseLanceWriter<R> implements
Closeable {
private int currentBatchSize = 0;
private VectorSchemaRoot root;
private ArrowWriter<R> arrowWriter;
+ protected final Option<HoodieBloomFilterWriteSupport<?>>
bloomFilterWriteSupportOpt;
Review Comment:
Can this be private?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java:
##########
@@ -100,34 +105,42 @@ public HoodieSparkLanceWriter(StoragePath file,
* @param sparkSchema Spark schema for the data
* @param taskContextSupplier Task context supplier for partition ID
* @param storage HoodieStorage instance
- * @throws IOException if writer initialization fails
*/
public HoodieSparkLanceWriter(StoragePath file,
StructType sparkSchema,
TaskContextSupplier taskContextSupplier,
- HoodieStorage storage) throws IOException {
- this(file, sparkSchema, null, taskContextSupplier, storage, false);
+ HoodieStorage storage) {
+ this(file, sparkSchema, null, taskContextSupplier, storage, false,
Option.empty());
}
@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
if (populateMetaFields) {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+
((HoodieBloomFilterRowWriteSupport)bloomFilterWriteSupport).addKey(recordKey));
updateRecordMetadata(row, recordKey, key.getPartitionPath(),
getWrittenRecordCount());
super.write(row);
} else {
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> {
+ UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+
((HoodieBloomFilterRowWriteSupport)bloomFilterWriteSupport).addKey(recordKey);
Review Comment:
This is triggering my OCD a little. We're casting everytime we need to use
`bloomFilterWriteSupport`.
We can make `HoodieBaseLanceWriter` generic on the key
`HoodieBaseLanceWriter<R, K>` with `Option<HoodieBloomFilterWriteSupport<K>>`,
or add a protected method like `addKeyToBloomFilter(UTF8String key)` in
`HoodieSparkLanceWriter` to centralize the cast.
Not sure if the former is easy and not sure if it the IDE will start
complaining about compile errors (generics can get messy fast if not done
properly and mess with readability), but these are the 2 options.
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java:
##########
@@ -156,6 +164,14 @@ public void close() throws IOException {
primaryException = e;
}
+ // Finalize and write bloom filter metadata
+ if (writer != null && bloomFilterWriteSupportOpt.isPresent()) {
+ Map<String, String> metadata =
bloomFilterWriteSupportOpt.get().finalizeMetadata();
+ if (!metadata.isEmpty()) {
+ writer.addSchemaMetadata(metadata);
+ }
+ }
Review Comment:
This seems risky, should we put it in a try-catch-finally block?
Can `finalizeMetadata()` or `addSchemaMetadata()` throw any error? If they
throw errors, there might be leaks.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java:
##########
@@ -100,34 +105,42 @@ public HoodieSparkLanceWriter(StoragePath file,
* @param sparkSchema Spark schema for the data
* @param taskContextSupplier Task context supplier for partition ID
* @param storage HoodieStorage instance
- * @throws IOException if writer initialization fails
*/
public HoodieSparkLanceWriter(StoragePath file,
StructType sparkSchema,
TaskContextSupplier taskContextSupplier,
- HoodieStorage storage) throws IOException {
- this(file, sparkSchema, null, taskContextSupplier, storage, false);
+ HoodieStorage storage) {
+ this(file, sparkSchema, null, taskContextSupplier, storage, false,
Option.empty());
}
@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
if (populateMetaFields) {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+
((HoodieBloomFilterRowWriteSupport)bloomFilterWriteSupport).addKey(recordKey));
Review Comment:
Nit:
`((HoodieBloomFilterRowWriteSupport)bloomFilterWriteSupport).addKey(recordKey))`
is called on both the `if` and `else` clause.
We can shift the the key construction and key adding part out, `ifPresent`
covers both cases.
`super.writeRow()` can be shifted out too
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java:
##########
Review Comment:
Hmm, can we add tests for verifying bloom filter metadata is actually
written correctly?
All updated tests pass a bloom filter to the writer, but none verify that
bloom filter metadata is present in the lance file.
The test should also check if bloom filter contains expected record keys
with the correct min/max record key ranges. i.e. presence and correctness needs
to be checked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]