szehon-ho commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r924946511
##########
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:
##########
@@ -39,12 +40,34 @@ private PartitionUtil() {
return constantsMap(task, null, (type, constant) -> constant);
}
+ public static Map<Integer, ?> constantsMap(ContentScanTask task,
BiFunction<Type, Object, Object> convertConstant) {
+ return constantsMapInternal(task, null, convertConstant);
+ }
+
+ /**
+ * @deprecated Replaced by {@link
PartitionUtil#constantsMap(ContentScanTask, BiFunction)}
+ */
+ @Deprecated
public static Map<Integer, ?> constantsMap(FileScanTask task,
BiFunction<Type, Object, Object> convertConstant) {
Review Comment:
Maybe a silly question, do we need to do this? Will it be backward
compatible if we just have everything go to ContentScanTask version given its a
supertype?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
+ implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Iterator<CST> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private CST currentTask = null;
Review Comment:
Maybe TaskT is more descriptive?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
+ implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Iterator<CST> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private CST currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseDataReader(Table table, G task) {
this.table = table;
- this.tasks = task.files().iterator();
+ this.tasks = task.tasks().iterator();
+ this.inputFiles = inputFiles(task);
+ this.currentIterator = CloseableIterator.empty();
+ }
+
+ private Map<String, InputFile> inputFiles(G task) {
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
- task.files().stream()
- .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
fileScanTask.deletes().stream()))
+ Stream<ContentFile> dataFileStream = task.tasks().stream()
+ .flatMap(contentScanTask -> {
+ Stream<ContentFile> stream = Stream.of(contentScanTask.file());
+ if (contentScanTask.isFileScanTask()) {
+ stream = Stream.concat(stream,
contentScanTask.asFileScanTask().deletes().stream());
Review Comment:
I guess it's already not fully functional style as we are re-assigning
stream variable, so maybe we can simplify somewhat by just returning like:
if (contentScanTask.isFileScanTask()) {
return Stream.concat...
} else if (contentScantask instanceof AddedRowsScanTask) {
return stream.concat...
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
Review Comment:
G => GroupT?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java:
##########
@@ -39,22 +36,22 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
-class RowDataReader extends BaseDataReader<InternalRow> {
+class RowDataReader<CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
Review Comment:
Is RowDataReader only for FileScanTask? We should just provide the
parameter to BaseDataReader right?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java:
##########
@@ -125,8 +120,10 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
return iter.iterator();
}
- private SparkDeleteFilter deleteFilter(FileScanTask task) {
- return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task,
table().schema(), expectedSchema);
+ protected SparkDeleteFilter deleteFilter(CST task) {
+ Preconditions.checkArgument(task.isFileScanTask(), "Only FileScanTask is
supported for delete filtering");
Review Comment:
Is BatchDataReader only for FileScanTask? Should we just pass this as
parameter to BaseDataReader?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
*
* @param <T> is the Java class returned by this reader whose objects contain
one or more rows.
*/
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
+ implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDataReader.class);
private final Table table;
- private final Iterator<FileScanTask> tasks;
+ private final Iterator<CST> tasks;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- private FileScanTask currentTask = null;
+ private CST currentTask = null;
- BaseDataReader(Table table, CombinedScanTask task) {
+ BaseDataReader(Table table, G task) {
this.table = table;
- this.tasks = task.files().iterator();
+ this.tasks = task.tasks().iterator();
+ this.inputFiles = inputFiles(task);
+ this.currentIterator = CloseableIterator.empty();
+ }
+
+ private Map<String, InputFile> inputFiles(G task) {
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
- task.files().stream()
- .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
fileScanTask.deletes().stream()))
+ Stream<ContentFile> dataFileStream = task.tasks().stream()
+ .flatMap(contentScanTask -> {
+ Stream<ContentFile> stream = Stream.of(contentScanTask.file());
+ if (contentScanTask.isFileScanTask()) {
+ stream = Stream.concat(stream,
contentScanTask.asFileScanTask().deletes().stream());
+ } else if (contentScanTask instanceof AddedRowsScanTask) {
+ stream = Stream.concat(stream, ((AddedRowsScanTask)
contentScanTask).deletes().stream());
+ } else if (contentScanTask instanceof DeletedDataFileScanTask) {
+ stream = Stream.concat(stream, ((DeletedDataFileScanTask)
contentScanTask).existingDeletes().stream());
+ } else if (contentScanTask instanceof DeletedRowsScanTask) {
+ stream = Stream.concat(stream, ((DeletedRowsScanTask)
contentScanTask).addedDeletes().stream());
+ stream = Stream.concat(stream, ((DeletedRowsScanTask)
contentScanTask).existingDeletes().stream());
+ }
+ return stream;
+ });
+
+ dataFileStream
Review Comment:
A little outside scope of change but now we are making a variable, how
about,
Map<String, ByteBuffer> keyMetadata = dataFileStream.collect.toMap(file ->
file.key, file -> file.value)
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java:
##########
@@ -112,7 +112,7 @@ private List<DataFile> rewriteDataForTask(CombinedScanTask
task) throws Exceptio
try {
while (dataReader.next()) {
- InternalRow row = dataReader.get();
+ InternalRow row = (InternalRow) dataReader.get();
Review Comment:
Why is it necessary?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java:
##########
@@ -40,21 +38,20 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
-class BatchDataReader extends BaseDataReader<ColumnarBatch> {
+class BatchDataReader<CST extends ContentScanTask<?>, G extends
ScanTaskGroup<CST>>
Review Comment:
Same suggestion for CST
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]