flyrain commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r928036896


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -20,185 +20,54 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
-class RowDataReader extends BaseDataReader<InternalRow> {
-
-  private final Schema tableSchema;
-  private final Schema expectedSchema;
-  private final String nameMapping;
-  private final boolean caseSensitive;
-
-  RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, 
boolean caseSensitive) {
-    super(table, task);
-    this.tableSchema = table.schema();
-    this.expectedSchema = expectedSchema;
-    this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
-    this.caseSensitive = caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+  RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema 
expectedSchema, boolean caseSensitive) {
+    super(table, task, expectedSchema, caseSensitive);
   }
 
   @Override
   CloseableIterator<InternalRow> open(FileScanTask task) {
-    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, 
expectedSchema);
+    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema(), 
expectedSchema(), this);
 
     // schema or rows returned by readers
     Schema requiredSchema = deletes.requiredSchema();
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
-    DataFile file = task.file();
+    Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
 
     // update the current file for Spark's filename() function
-    InputFileBlockHolder.set(file.path().toString(), task.start(), 
task.length());
+    InputFileBlockHolder.set(task.file().path().toString(), task.start(), 
task.length());
 
     return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
   }
 
-  protected Schema tableSchema() {
-    return tableSchema;
-  }
-
   protected CloseableIterable<InternalRow> open(FileScanTask task, Schema 
readSchema, Map<Integer, ?> idToConstant) {
     CloseableIterable<InternalRow> iter;
     if (task.isDataTask()) {
       iter = newDataIterable(task.asDataTask(), readSchema);
     } else {
-      InputFile location = getInputFile(task);
+      InputFile location = getInputFile(task.file().path().toString());
       Preconditions.checkNotNull(location, "Could not find InputFile 
associated with FileScanTask");

Review Comment:
   It is still FileScanTask here. 
   ```
   class RowDataReader extends BaseRowReader<FileScanTask>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to