aokolnychyi commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r929440083


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -137,21 +157,36 @@ public void close() throws IOException {
     }
   }
 
-  protected InputFile getInputFile(FileScanTask task) {
-    Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
-    return inputFiles.get(task.file().path().toString());
+  protected InputFile getInputFile(String location) {
+    return inputFiles().get(location);
   }
 
-  protected InputFile getInputFile(String location) {
-    return inputFiles.get(location);
+  private Map<String, InputFile> inputFiles() {
+    if (lazyInputFiles == null) {
+      Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
+          .flatMap(this::referencedFiles)
+          .map(file ->
+              
EncryptedFiles.encryptedInput(table.io().newInputFile(file.path().toString()), 
file.keyMetadata()));
+
+      // decrypt with the batch call to avoid multiple RPCs to a key server, 
if possible
+      Iterable<InputFile> decryptedFiles = 
table.encryption().decrypt(encryptedFiles::iterator);
+
+      Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(taskGroup.tasks().size());
+      decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
+      this.lazyInputFiles = ImmutableMap.copyOf(files);
+    }
+
+    return lazyInputFiles;
   }
 
-  protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) 
{
+  protected abstract Stream<ContentFile<?>> referencedFiles(TaskT task);
+
+  protected Map<Integer, ?> constantsMap(ContentScanTask<?> task, Schema 
readSchema) {
     if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
-      StructType partitionType = Partitioning.partitionType(table);
-      return PartitionUtil.constantsMap(task, partitionType, 
BaseDataReader::convertConstant);
+      Types.StructType partitionType = Partitioning.partitionType(table);

Review Comment:
   Did we add `Types.` on purpose?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -122,7 +142,7 @@ public T get() {
     return current;
   }
 
-  abstract CloseableIterator<T> open(FileScanTask task);
+  abstract CloseableIterator<T> open(TaskT task);

Review Comment:
   nit: In other classes (not very consistent), we usually put abstract methods 
immediately after the constructor so that it is obvious what children must 
implement. Since we are changing this line anyway and also adding one more 
abstract method, what about putting those two immediately after the constructor 
and making both either protected or package-private for consistency?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
+  private final int batchSize;
+
+  BaseBatchReader(Table table, ScanTaskGroup<T> taskGroup, Schema 
expectedSchema, boolean caseSensitive,
+                  int batchSize) {
+    super(table, taskGroup, expectedSchema, caseSensitive);
+    this.batchSize = batchSize;
+  }
+
+  protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile 
location, FileFormat format,
+                                                              long start, long 
length, Expression residual,
+                                                              Map<Integer, ?> 
idToConstant,
+                                                              
SparkDeleteFilter deleteFilter) {
+    switch (format) {
+      case PARQUET:
+        return newParquetIterable(location, start, length, residual, 
idToConstant, deleteFilter);
+
+      case ORC:
+        return newOrcIterable(location, start, length, residual, idToConstant);
+
+      default:
+        throw new UnsupportedOperationException("Format: " + format + " not 
supported for batched reads");
+    }
+  }
+
+  private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile 
location, long start, long length,

Review Comment:
   nit: Same here



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
+  private final int batchSize;
+
+  BaseBatchReader(Table table, ScanTaskGroup<T> taskGroup, Schema 
expectedSchema, boolean caseSensitive,
+                  int batchSize) {
+    super(table, taskGroup, expectedSchema, caseSensitive);
+    this.batchSize = batchSize;
+  }
+
+  protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile 
location, FileFormat format,

Review Comment:
   nit: I know it was like this in the old implementation but what about 
renaming `location` -> `file`?



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java:
##########
@@ -51,7 +53,7 @@
 import static org.apache.iceberg.FileFormat.PARQUET;
 import static org.apache.iceberg.Files.localOutput;
 
-public class TestSparkBaseDataReader {
+public class TestBaseReader {

Review Comment:
   Is rename required?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -137,21 +157,36 @@ public void close() throws IOException {
     }
   }
 
-  protected InputFile getInputFile(FileScanTask task) {
-    Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
-    return inputFiles.get(task.file().path().toString());
+  protected InputFile getInputFile(String location) {
+    return inputFiles().get(location);
   }
 
-  protected InputFile getInputFile(String location) {
-    return inputFiles.get(location);
+  private Map<String, InputFile> inputFiles() {
+    if (lazyInputFiles == null) {
+      Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
+          .flatMap(this::referencedFiles)
+          .map(file ->
+              
EncryptedFiles.encryptedInput(table.io().newInputFile(file.path().toString()), 
file.keyMetadata()));

Review Comment:
   optional: You could define an helper method for constructing encrypted input 
files and fit this on one line.
   
   ```
   Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
       .flatMap(this::referencedFiles)
       .map(this::toEncryptedInputFile);
   ```
   
   ```
   private EncryptedInputFile toEncryptedInputFile(ContentFile<?> file) {
     InputFile inputFile = table.io().newInputFile(file.path().toString());
     return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata());
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java:
##########
@@ -20,139 +20,44 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
-import org.apache.arrow.vector.NullCheckingForGet;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
-class BatchDataReader extends BaseDataReader<ColumnarBatch> {
-  private final Schema expectedSchema;
-  private final String nameMapping;
-  private final boolean caseSensitive;
-  private final int batchSize;
+class BatchDataReader extends BaseBatchReader<FileScanTask> {
+  BatchDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema 
expectedSchema, boolean caseSensitive,
+                  int size) {
+    super(table, task, expectedSchema, caseSensitive, size);
+  }
 
-  BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, 
boolean caseSensitive, int size) {
-    super(table, task);
-    this.expectedSchema = expectedSchema;
-    this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
-    this.caseSensitive = caseSensitive;
-    this.batchSize = size;
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+    return Stream.concat(Stream.of(task.file()), task.deletes().stream());
   }
 
   @Override
   CloseableIterator<ColumnarBatch> open(FileScanTask task) {
-    DataFile file = task.file();
+    String filePath = task.file().path().toString();
 
     // update the current file for Spark's filename() function
-    InputFileBlockHolder.set(file.path().toString(), task.start(), 
task.length());
-
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
-
-    CloseableIterable<ColumnarBatch> iter;
-    InputFile location = getInputFile(task);
-    Preconditions.checkNotNull(location, "Could not find InputFile associated 
with FileScanTask");
-    if (task.file().format() == FileFormat.PARQUET) {
-      SparkDeleteFilter deleteFilter = deleteFilter(task);
-      // get required schema for filtering out equality-delete rows in case 
equality-delete uses columns are
-      // not selected.
-      Schema requiredSchema = requiredSchema(deleteFilter);
-
-      Parquet.ReadBuilder builder = Parquet.read(location)
-          .project(requiredSchema)
-          .split(task.start(), task.length())
-          .createBatchedReaderFunc(fileSchema -> 
VectorizedSparkParquetReaders.buildReader(requiredSchema,
-              fileSchema, /* setArrowValidityVector */ 
NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
-              deleteFilter))
-          .recordsPerBatch(batchSize)
-          .filter(task.residual())
-          .caseSensitive(caseSensitive)
-          // Spark eagerly consumes the batches. So the underlying memory 
allocated could be reused
-          // without worrying about subsequent reads clobbering over each 
other. This improves
-          // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
-          .reuseContainers();
-
-      if (nameMapping != null) {
-        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      iter = builder.build();
-    } else if (task.file().format() == FileFormat.ORC) {
-      Set<Integer> constantFieldIds = idToConstant.keySet();
-      Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
-      Sets.SetView<Integer> constantAndMetadataFieldIds = 
Sets.union(constantFieldIds, metadataFieldIds);
-      Schema schemaWithoutConstantAndMetadataFields = 
TypeUtil.selectNot(expectedSchema, constantAndMetadataFieldIds);
-      ORC.ReadBuilder builder = ORC.read(location)
-          .project(schemaWithoutConstantAndMetadataFields)
-          .split(task.start(), task.length())
-          .createBatchedReaderFunc(fileSchema -> 
VectorizedSparkOrcReaders.buildReader(expectedSchema, fileSchema,
-              idToConstant))
-          .recordsPerBatch(batchSize)
-          .filter(task.residual())
-          .caseSensitive(caseSensitive);
-
-      if (nameMapping != null) {
-        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      iter = builder.build();
-    } else {
-      throw new UnsupportedOperationException(
-          "Format: " + task.file().format() + " not supported for batched 
reads");
-    }
-    return iter.iterator();
-  }
-
-  private SparkDeleteFilter deleteFilter(FileScanTask task) {
-    return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, 
table().schema(), expectedSchema);
-  }
-
-  private Schema requiredSchema(DeleteFilter deleteFilter) {
-    if (deleteFilter != null && deleteFilter.hasEqDeletes()) {
-      return deleteFilter.requiredSchema();
-    } else {
-      return expectedSchema;
-    }
-  }
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
 
-  private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
-    private final InternalRowWrapper asStructLike;
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
 
-    SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema 
requestedSchema) {
-      super(task.file().path().toString(), task.deletes(), tableSchema, 
requestedSchema);
-      this.asStructLike = new 
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
-    }
+    InputFile inputFile = getInputFile(filePath);
+    Preconditions.checkNotNull(inputFile, "Could not find InputFile associated 
with FileScanTask");
 
-    @Override
-    protected StructLike asStructLike(InternalRow row) {
-      return asStructLike.wrap(row);
-    }
+    SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, 
task.deletes());

Review Comment:
   I think this changed the previous behavior where we would not construct a 
delete filter if the list of deletes if empty. Shall we still pass null if 
deletes are empty just to avoid surprises? I am not sure there will be any 
performance degradation but it seems safer to keep the old behavior.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -20,185 +20,59 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
-class RowDataReader extends BaseDataReader<InternalRow> {
-
-  private final Schema tableSchema;
-  private final Schema expectedSchema;
-  private final String nameMapping;
-  private final boolean caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+  RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema 
expectedSchema, boolean caseSensitive) {
+    super(table, task, expectedSchema, caseSensitive);
+  }
 
-  RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, 
boolean caseSensitive) {
-    super(table, task);
-    this.tableSchema = table.schema();
-    this.expectedSchema = expectedSchema;
-    this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
-    this.caseSensitive = caseSensitive;
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
+    return Stream.concat(Stream.of(task.file()), task.deletes().stream());
   }
 
   @Override
   CloseableIterator<InternalRow> open(FileScanTask task) {
-    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, 
expectedSchema);
+    String filePath = task.file().path().toString();
+    SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, 
task.deletes());
 
     // schema or rows returned by readers
-    Schema requiredSchema = deletes.requiredSchema();
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
-    DataFile file = task.file();
+    Schema requiredSchema = deleteFilter.requiredSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
 
     // update the current file for Spark's filename() function
-    InputFileBlockHolder.set(file.path().toString(), task.start(), 
task.length());
-
-    return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
-  }
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
 
-  protected Schema tableSchema() {
-    return tableSchema;
+    return deleteFilter.filter(open(task, requiredSchema, 
idToConstant)).iterator();
   }
 
   protected CloseableIterable<InternalRow> open(FileScanTask task, Schema 
readSchema, Map<Integer, ?> idToConstant) {
-    CloseableIterable<InternalRow> iter;
     if (task.isDataTask()) {
-      iter = newDataIterable(task.asDataTask(), readSchema);
+      return newDataIterable(task.asDataTask(), readSchema);
     } else {
-      InputFile location = getInputFile(task);
-      Preconditions.checkNotNull(location, "Could not find InputFile 
associated with FileScanTask");
-
-      switch (task.file().format()) {
-        case PARQUET:
-          iter = newParquetIterable(location, task, readSchema, idToConstant);
-          break;
-
-        case AVRO:
-          iter = newAvroIterable(location, task, readSchema, idToConstant);
-          break;
-
-        case ORC:
-          iter = newOrcIterable(location, task, readSchema, idToConstant);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Cannot read unknown format: " + task.file().format());
-      }
-    }
-
-    return iter;
-  }
-
-  private CloseableIterable<InternalRow> newAvroIterable(
-      InputFile location,
-      FileScanTask task,
-      Schema projection,
-      Map<Integer, ?> idToConstant) {
-    Avro.ReadBuilder builder = Avro.read(location)
-        .reuseContainers()
-        .project(projection)
-        .split(task.start(), task.length())
-        .createReaderFunc(readSchema -> new SparkAvroReader(projection, 
readSchema, idToConstant));
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<InternalRow> newParquetIterable(
-      InputFile location,
-      FileScanTask task,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Parquet.ReadBuilder builder = Parquet.read(location)
-        .reuseContainers()
-        .split(task.start(), task.length())
-        .project(readSchema)
-        .createReaderFunc(fileSchema -> 
SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
-        .filter(task.residual())
-        .caseSensitive(caseSensitive);
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      InputFile inputFile = getInputFile(task.file().path().toString());
+      Preconditions.checkNotNull(inputFile, "Could not find InputFile 
associated with FileScanTask");
+      return newIterable(inputFile, task.file().format(), task.start(), 
task.length(), task.residual(), readSchema,
+          idToConstant);
     }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<InternalRow> newOrcIterable(
-      InputFile location,
-      FileScanTask task,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Schema readSchemaWithoutConstantAndMetadataFields = 
TypeUtil.selectNot(readSchema,
-        Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
-
-    ORC.ReadBuilder builder = ORC.read(location)
-        .project(readSchemaWithoutConstantAndMetadataFields)
-        .split(task.start(), task.length())
-        .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, 
readOrcSchema, idToConstant))
-        .filter(task.residual())
-        .caseSensitive(caseSensitive);
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
   }
 
   private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema 
readSchema) {
     StructInternalRow row = new StructInternalRow(readSchema.asStruct());
-    CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
-        task.asDataTask().rows(), row::setStruct);
+    CloseableIterable<InternalRow> asSparkRows = 
CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);

Review Comment:
   nit: What about inlining to remove the useless temp var?
   
   ```
   return CloseableIterable.transform(task.asDataTask().rows(), row::setStruct);
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
+  private final int batchSize;
+
+  BaseBatchReader(Table table, ScanTaskGroup<T> taskGroup, Schema 
expectedSchema, boolean caseSensitive,
+                  int batchSize) {
+    super(table, taskGroup, expectedSchema, caseSensitive);
+    this.batchSize = batchSize;
+  }
+
+  protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile 
location, FileFormat format,
+                                                              long start, long 
length, Expression residual,
+                                                              Map<Integer, ?> 
idToConstant,
+                                                              
SparkDeleteFilter deleteFilter) {
+    switch (format) {
+      case PARQUET:
+        return newParquetIterable(location, start, length, residual, 
idToConstant, deleteFilter);
+
+      case ORC:
+        return newOrcIterable(location, start, length, residual, idToConstant);
+
+      default:
+        throw new UnsupportedOperationException("Format: " + format + " not 
supported for batched reads");
+    }
+  }
+
+  private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile 
location, long start, long length,
+                                                              Expression 
residual, Map<Integer, ?> idToConstant,
+                                                              
SparkDeleteFilter deleteFilter) {
+    // get required schema for filtering out equality-delete rows in case 
equality-delete uses columns are
+    // not selected.
+    Schema requiredSchema = deleteFilter != null && 
deleteFilter.hasEqDeletes() ?
+        deleteFilter.requiredSchema() : expectedSchema();
+
+    return Parquet.read(location)
+        .project(requiredSchema)
+        .split(start, length)
+        .createBatchedReaderFunc(fileSchema -> 
VectorizedSparkParquetReaders.buildReader(requiredSchema,
+            fileSchema, /* setArrowValidityVector */ 
NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
+            deleteFilter))
+        .recordsPerBatch(batchSize)
+        .filter(residual)
+        .caseSensitive(caseSensitive())
+        // Spark eagerly consumes the batches. So the underlying memory 
allocated could be reused
+        // without worrying about subsequent reads clobbering over each other. 
This improves
+        // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
+        .reuseContainers()
+        .withNameMapping(nameMapping())
+        .build();
+  }
+
+  private CloseableIterable<ColumnarBatch> newOrcIterable(InputFile location, 
long start, long length,

Review Comment:
   nit: Same here



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -200,4 +235,32 @@ protected static Object convertConstant(Type type, Object 
value) {
     }
     return value;
   }
+
+  protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+    private final InternalRowWrapper asStructLike;
+
+    SparkDeleteFilter(String filePath, List<DeleteFile> deletes, Schema 
requestedSchema) {

Review Comment:
   optional: You may remove this constructor by using `expectedSchema()` in 
`EqualityDeleteRowReader` that is available now via the base reader. It is a 
bit confusing that we compute the table schema using an instance var of the 
outer class yet accept a variable for expected schema even though we have 
access to the expected schema in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to