aokolnychyi commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r927853108
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -60,35 +63,57 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+ private final String nameMapping;
Review Comment:
nit: What about making this `NameMapping` instead of `String` and also
returning it in the getter?
Here is how we init in other places:
```
String nameMappingString =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.nameMapping = nameMappingString != null ?
NameMappingParser.fromJson(nameMappingString) : null;
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
+ private final Schema tableSchema;
+
+ BaseRowReader(Table table, ScanTaskGroup<T> taskGroup, Schema
expectedSchema, boolean caseSensitive) {
+ super(table, taskGroup, expectedSchema, caseSensitive);
+ this.tableSchema = table.schema();
+ }
+
+ protected Schema tableSchema() {
+ return tableSchema;
+ }
+
+ protected CloseableIterable<InternalRow> newIterable(InputFile file,
FileFormat format, long start, long length,
+ Expression residual,
Schema projection,
+ Map<Integer, ?>
idToConstant) {
+ switch (format) {
+ case PARQUET:
+ return newParquetIterable(file, start, length, residual, projection,
idToConstant);
+
+ case AVRO:
+ return newAvroIterable(file, start, length, projection, idToConstant);
+
+ case ORC:
+ return newOrcIterable(file, start, length, residual, projection,
idToConstant);
+
+ default:
+ throw new UnsupportedOperationException("Cannot read unknown format: "
+ format);
+ }
+ }
+
+ private CloseableIterable<InternalRow> newAvroIterable(
+ InputFile location,
+ long start,
+ long length,
+ Schema projection,
+ Map<Integer, ?> idToConstant) {
+ Avro.ReadBuilder builder = Avro.read(location)
+ .reuseContainers()
+ .project(projection)
+ .split(start, length)
+ .createReaderFunc(readSchema -> new SparkAvroReader(projection,
readSchema, idToConstant));
+
+ if (nameMapping() != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping()));
+ }
+
+ return builder.build();
+ }
+
+ private CloseableIterable<InternalRow> newParquetIterable(
+ InputFile location,
+ long start,
+ long length,
+ Expression residual,
+ Schema readSchema,
+ Map<Integer, ?> idToConstant) {
+ Parquet.ReadBuilder builder = Parquet.read(location)
+ .reuseContainers()
+ .split(start, length)
+ .project(readSchema)
+ .createReaderFunc(fileSchema ->
SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
+ .filter(residual)
+ .caseSensitive(caseSensitive());
+
+ if (nameMapping() != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping()));
+ }
+
+ return builder.build();
+ }
+
+ private CloseableIterable<InternalRow> newOrcIterable(
Review Comment:
Same here
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.DeletedRowsScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
Review Comment:
Let's add in a separate PR.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -60,35 +63,57 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+ private final String nameMapping;
+ private final Iterator<TaskT> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private TaskT currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseReader(Table table, ScanTaskGroup<TaskT> task, Schema expectedSchema,
boolean caseSensitive) {
this.table = table;
- this.tasks = task.files().iterator();
+ this.tasks = task.tasks().iterator();
+ this.inputFiles = inputFiles(task);
+ this.currentIterator = CloseableIterator.empty();
+ this.expectedSchema = expectedSchema;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+ }
+
+ protected Schema expectedSchema() {
+ return expectedSchema;
+ }
+
+ protected boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ protected String nameMapping() {
+ return nameMapping;
+ }
+
+ private Map<String, InputFile> inputFiles(ScanTaskGroup<TaskT> task) {
Review Comment:
nit: `task` -> `taskGroup`
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBatch, T> {
Review Comment:
nit: does it have to be public?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -60,35 +63,57 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+ private final String nameMapping;
+ private final Iterator<TaskT> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private TaskT currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseReader(Table table, ScanTaskGroup<TaskT> task, Schema expectedSchema,
boolean caseSensitive) {
Review Comment:
nit: `task` -> `taskGroup`?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -60,35 +63,57 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+ private final String nameMapping;
+ private final Iterator<TaskT> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private TaskT currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseReader(Table table, ScanTaskGroup<TaskT> task, Schema expectedSchema,
boolean caseSensitive) {
this.table = table;
- this.tasks = task.files().iterator();
+ this.tasks = task.tasks().iterator();
+ this.inputFiles = inputFiles(task);
+ this.currentIterator = CloseableIterator.empty();
+ this.expectedSchema = expectedSchema;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+ }
+
+ protected Schema expectedSchema() {
+ return expectedSchema;
+ }
+
+ protected boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ protected String nameMapping() {
+ return nameMapping;
+ }
+
+ private Map<String, InputFile> inputFiles(ScanTaskGroup<TaskT> task) {
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
- task.files().stream()
- .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
fileScanTask.deletes().stream()))
+ task.tasks().stream()
Review Comment:
nit: I know this is not this PR's problem but because we touch this place,
would it make sense to simplify?
```
Stream<EncryptedInputFile> encrypted = taskGroup.tasks().stream()
.flatMap(task -> ...)
.map(file ->
EncryptedFiles.encryptedInput(table.io().newInputFile(file.path().toString()),
file.keyMetadata()));
```
I am fine doing this in a follow-up too.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
+ private final Schema tableSchema;
Review Comment:
I'd say either get rid of this var and use `table().schema()` or move it to
`BaseReader` where we have other getters.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
Review Comment:
nit: Does it have to be public?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -60,35 +63,57 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
+abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+ private final String nameMapping;
+ private final Iterator<TaskT> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private TaskT currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseReader(Table table, ScanTaskGroup<TaskT> task, Schema expectedSchema,
boolean caseSensitive) {
Review Comment:
I think we should be able to get rid of changes in `ScanTask` and offer an
abstract method like this:
```
protected abstract Stream<ContentFile<?>> referencedFiles(TaskT task);
```
We shouldn't be calling abstract methods in the constructor so you can make
`inputFiles` lazy.
```
private final ScanTaskGroup<TaskT> taskGroup;
private final Iterator<TaskT> tasks;
...
private Map<String, InputFile> lazyInputFiles;
...
protected InputFile getInputFile(String location) {
return inputFiles().get(location);
}
private Map<String, InputFile> inputFiles() {
if (lazyInputFiles == null) {
Stream<EncryptedInputFile> encryptedFiles = taskGroup.tasks().stream()
.flatMap(this::referencedFiles)
.map(file ->
EncryptedFiles.encryptedInput(table.io().newInputFile(file.path().toString()),
file.keyMetadata()));
// decrypt with the batch call to avoid multiple RPCs to a key server,
if possible
Iterable<InputFile> decryptedFiles =
table.encryption().decrypt(encryptedFiles::iterator);
Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(taskGroup.tasks().size());
decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
this.lazyInputFiles = ImmutableMap.copyOf(files);
}
return lazyInputFiles;
}
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java:
##########
@@ -111,8 +136,11 @@ public boolean next() throws IOException {
}
}
} catch (IOException | RuntimeException e) {
- if (currentTask != null && !currentTask.isDataTask()) {
Review Comment:
Are we sure we want to change `!currentTask.isDataTask()`?
On a side note, this can be rewritten with the new abstract method:
```
if (currentTask != null && !currentTask.isDataTask()) {
String filePaths = referencedFiles(currentTask)
.map(file -> file.path().toString())
.collect(Collectors.joining(", "));
LOG.error("Error reading file(s): {}", filePaths, e);
}
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -20,185 +20,54 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
-class RowDataReader extends BaseDataReader<InternalRow> {
-
- private final Schema tableSchema;
- private final Schema expectedSchema;
- private final String nameMapping;
- private final boolean caseSensitive;
-
- RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive) {
- super(table, task);
- this.tableSchema = table.schema();
- this.expectedSchema = expectedSchema;
- this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
- this.caseSensitive = caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+ RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema
expectedSchema, boolean caseSensitive) {
+ super(table, task, expectedSchema, caseSensitive);
}
@Override
CloseableIterator<InternalRow> open(FileScanTask task) {
- SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema,
expectedSchema);
+ SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema(),
expectedSchema(), this);
// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
- DataFile file = task.file();
+ Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
// update the current file for Spark's filename() function
- InputFileBlockHolder.set(file.path().toString(), task.start(),
task.length());
+ InputFileBlockHolder.set(task.file().path().toString(), task.start(),
task.length());
return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}
- protected Schema tableSchema() {
- return tableSchema;
- }
-
protected CloseableIterable<InternalRow> open(FileScanTask task, Schema
readSchema, Map<Integer, ?> idToConstant) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
} else {
- InputFile location = getInputFile(task);
+ InputFile location = getInputFile(task.file().path().toString());
Review Comment:
The old name seems a little bit weird. Shall we rename it to `inputFile`?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
+ private final Schema tableSchema;
+
+ BaseRowReader(Table table, ScanTaskGroup<T> taskGroup, Schema
expectedSchema, boolean caseSensitive) {
+ super(table, taskGroup, expectedSchema, caseSensitive);
+ this.tableSchema = table.schema();
+ }
+
+ protected Schema tableSchema() {
+ return tableSchema;
+ }
+
+ protected CloseableIterable<InternalRow> newIterable(InputFile file,
FileFormat format, long start, long length,
+ Expression residual,
Schema projection,
+ Map<Integer, ?>
idToConstant) {
+ switch (format) {
+ case PARQUET:
+ return newParquetIterable(file, start, length, residual, projection,
idToConstant);
+
+ case AVRO:
+ return newAvroIterable(file, start, length, projection, idToConstant);
+
+ case ORC:
+ return newOrcIterable(file, start, length, residual, projection,
idToConstant);
+
+ default:
+ throw new UnsupportedOperationException("Cannot read unknown format: "
+ format);
+ }
+ }
+
+ private CloseableIterable<InternalRow> newAvroIterable(
+ InputFile location,
Review Comment:
I know the old code was formatted this way but can we fix it now to match
the method above?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkDeleteFilter.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class SparkDeleteFilter extends DeleteFilter<InternalRow> {
Review Comment:
Do we still need this as a separate class?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
+ private final Schema tableSchema;
+
+ BaseRowReader(Table table, ScanTaskGroup<T> taskGroup, Schema
expectedSchema, boolean caseSensitive) {
+ super(table, taskGroup, expectedSchema, caseSensitive);
+ this.tableSchema = table.schema();
+ }
+
+ protected Schema tableSchema() {
+ return tableSchema;
+ }
+
+ protected CloseableIterable<InternalRow> newIterable(InputFile file,
FileFormat format, long start, long length,
+ Expression residual,
Schema projection,
+ Map<Integer, ?>
idToConstant) {
+ switch (format) {
+ case PARQUET:
+ return newParquetIterable(file, start, length, residual, projection,
idToConstant);
+
+ case AVRO:
+ return newAvroIterable(file, start, length, projection, idToConstant);
+
+ case ORC:
+ return newOrcIterable(file, start, length, residual, projection,
idToConstant);
+
+ default:
+ throw new UnsupportedOperationException("Cannot read unknown format: "
+ format);
+ }
+ }
+
+ private CloseableIterable<InternalRow> newAvroIterable(
+ InputFile location,
+ long start,
+ long length,
+ Schema projection,
+ Map<Integer, ?> idToConstant) {
+ Avro.ReadBuilder builder = Avro.read(location)
+ .reuseContainers()
+ .project(projection)
+ .split(start, length)
+ .createReaderFunc(readSchema -> new SparkAvroReader(projection,
readSchema, idToConstant));
+
+ if (nameMapping() != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping()));
+ }
+
+ return builder.build();
+ }
+
+ private CloseableIterable<InternalRow> newParquetIterable(
Review Comment:
Same here
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -20,185 +20,54 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
-class RowDataReader extends BaseDataReader<InternalRow> {
-
- private final Schema tableSchema;
- private final Schema expectedSchema;
- private final String nameMapping;
- private final boolean caseSensitive;
-
- RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive) {
- super(table, task);
- this.tableSchema = table.schema();
- this.expectedSchema = expectedSchema;
- this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
- this.caseSensitive = caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+ RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema
expectedSchema, boolean caseSensitive) {
+ super(table, task, expectedSchema, caseSensitive);
}
@Override
CloseableIterator<InternalRow> open(FileScanTask task) {
- SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema,
expectedSchema);
+ SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema(),
expectedSchema(), this);
// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
- DataFile file = task.file();
+ Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
// update the current file for Spark's filename() function
- InputFileBlockHolder.set(file.path().toString(), task.start(),
task.length());
+ InputFileBlockHolder.set(task.file().path().toString(), task.start(),
task.length());
return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}
- protected Schema tableSchema() {
- return tableSchema;
- }
-
protected CloseableIterable<InternalRow> open(FileScanTask task, Schema
readSchema, Map<Integer, ?> idToConstant) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
} else {
- InputFile location = getInputFile(task);
+ InputFile location = getInputFile(task.file().path().toString());
Preconditions.checkNotNull(location, "Could not find InputFile
associated with FileScanTask");
-
- switch (task.file().format()) {
- case PARQUET:
- iter = newParquetIterable(location, task, readSchema, idToConstant);
- break;
-
- case AVRO:
- iter = newAvroIterable(location, task, readSchema, idToConstant);
- break;
-
- case ORC:
- iter = newOrcIterable(location, task, readSchema, idToConstant);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Cannot read unknown format: " + task.file().format());
- }
+ iter = newIterable(location, task.file().format(), task.start(),
task.length(), task.residual(), readSchema,
Review Comment:
What about using two separate return statements and pulling the format into
a separate var like this?
```
if (task.isDataTask()) {
return newDataIterable(task.asDataTask(), readSchema);
} else {
InputFile inputFile = getInputFile(task.file().path().toString());
Preconditions.checkNotNull(inputFile, "Could not find associated InputFile
with task");
FileFormat format = task.file().format();
return newIterable(inputFile, format, task.start(), task.length(),
task.residual(), readSchema, idToConstant);
}
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -20,185 +20,54 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
-class RowDataReader extends BaseDataReader<InternalRow> {
-
- private final Schema tableSchema;
- private final Schema expectedSchema;
- private final String nameMapping;
- private final boolean caseSensitive;
-
- RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive) {
- super(table, task);
- this.tableSchema = table.schema();
- this.expectedSchema = expectedSchema;
- this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
- this.caseSensitive = caseSensitive;
+class RowDataReader extends BaseRowReader<FileScanTask> {
+ RowDataReader(ScanTaskGroup<FileScanTask> task, Table table, Schema
expectedSchema, boolean caseSensitive) {
+ super(table, task, expectedSchema, caseSensitive);
}
@Override
CloseableIterator<InternalRow> open(FileScanTask task) {
- SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema,
expectedSchema);
+ SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema(),
expectedSchema(), this);
// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
- DataFile file = task.file();
+ Map<Integer, ?> idToConstant = constantsMap(task, requiredSchema);
// update the current file for Spark's filename() function
- InputFileBlockHolder.set(file.path().toString(), task.start(),
task.length());
+ InputFileBlockHolder.set(task.file().path().toString(), task.start(),
task.length());
return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}
- protected Schema tableSchema() {
- return tableSchema;
- }
-
protected CloseableIterable<InternalRow> open(FileScanTask task, Schema
readSchema, Map<Integer, ?> idToConstant) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
} else {
- InputFile location = getInputFile(task);
+ InputFile location = getInputFile(task.file().path().toString());
Preconditions.checkNotNull(location, "Could not find InputFile
associated with FileScanTask");
Review Comment:
I think the error message should be adapted now as the task is not
necessarily FileScanTask
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]