rdblue commented on code in PR #12298:
URL: https://github.com/apache/iceberg/pull/12298#discussion_r2756623197
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -76,79 +73,38 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable(
Expression residual,
Map<Integer, ?> idToConstant,
@Nonnull SparkDeleteFilter deleteFilter) {
- CloseableIterable<ColumnarBatch> iterable;
- switch (format) {
- case PARQUET:
- iterable =
- newParquetIterable(
- inputFile, start, length, residual, idToConstant,
deleteFilter.requiredSchema());
- break;
- case ORC:
- iterable = newOrcIterable(inputFile, start, length, residual,
idToConstant);
- break;
- default:
- throw new UnsupportedOperationException(
- "Format: " + format + " not supported for batched reads");
+ ReadBuilder<ColumnarBatch, ?> readBuilder;
+ if (parquetConf != null) {
+ readBuilder =
+ parquetConf.readerType() == ParquetReaderType.COMET
+ ? FormatModelRegistry.readBuilder(
+ format,
VectorizedSparkParquetReaders.CometColumnarBatch.class, inputFile)
+ : FormatModelRegistry.readBuilder(format, ColumnarBatch.class,
inputFile);
+
+ readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize());
+ } else {
+ readBuilder = FormatModelRegistry.readBuilder(format,
ColumnarBatch.class, inputFile);
+ if (orcConf != null) {
+ readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
Review Comment:
I think this is a bit awkward because it is using the separate configs to
modify a format-agnostic API. I think it is more clear to move the builder
configuration to the top level, like this:
```java
private boolean useComet() {
return parquetConf != null && parquetConf.readerType() ==
ParquetReaderType.COMET;
}
Class<? extends ColumnarBatch> readType =
useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class
: ColumnarBatch.class;
ReadBuilder<ColumnarBatch, ?> readBuilder =
FormatModelRegistry.readBuilder(format, readType, inputFile);
int batchSize = parquetConf != null ? parquetConf.batchSize() :
orcConf.batchSize();
readBuilder.recordsPerBatch(batchSize);
CloseableIterable<ColumnarBatch> iterable = ...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]