openinx commented on a change in pull request #3240: URL: https://github.com/apache/iceberg/pull/3240#discussion_r736286743
########## File path: flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +public class RowDataProjection implements RowData { + /** + * Creates a projecting wrapper for {@link RowData} rows. + * <p> + * This projection does not work with repeated types like lists and maps. + * + * @param schema schema of rows wrapped by this projection + * @param projectedSchema result schema of the projected rows + * @return a wrapper to project rows + */ + public static RowDataProjection create(Schema schema, Schema projectedSchema) { + return new RowDataProjection(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectedSchema.asStruct()); + } + + private final RowData.FieldGetter[] getters; + private RowData rowData; + + private RowDataProjection(RowType rowType, Types.StructType rowStruct, Types.StructType projectType) { + this.getters = new RowData.FieldGetter[projectType.fields().size()]; + for (int i = 0; i < getters.length; i++) { + getters[i] = createFieldGetter(rowType, rowStruct, projectType.fields().get(i)); + } + } + + private static RowData.FieldGetter createFieldGetter(RowType rowType, + Types.StructType rowStruct, + Types.NestedField projectField) { + for (int i = 0; i < rowStruct.fields().size(); i++) { + Types.NestedField rowField = rowStruct.fields().get(i); + if (rowField.fieldId() == projectField.fieldId()) { + Preconditions.checkArgument(rowField.type().typeId() == projectField.type().typeId(), + String.format("Different iceberg type between row field <%s> and project field <%s>", + rowField, projectField)); + + switch (projectField.type().typeId()) { + case STRUCT: + RowType nestedRowType = (RowType) rowType.getTypeAt(i); + int rowPos = i; + return row -> { + RowData nestedRow = row.isNullAt(rowPos) ? null : row.getRow(rowPos, nestedRowType.getFieldCount()); Review comment: Q: If the `nestedRow` is null, do we still need to traverse the nested fields by using the `RowDataProjection#project` ? I think we can just return the `null` for the projection value ? ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java ########## @@ -70,9 +71,17 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema, PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); - return deletes - .filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)) - .iterator(); + CloseableIterable<RowData> iterable = deletes.filter( + newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor) + ); + + // Project the RowData to remove the extra meta columns. + if (!projectedSchema.sameSchema(deletes.requiredSchema())) { Review comment: I think we need to provide a unit test to address the point that if we don't have any eq-deletes or pos-deletes in the task then the best approach is skipping the RowData projection removing the extra columns. In other word : 1. If it's iceberg v1 table, then we need to assert that the iterator won't do any extra projection to exclude the meta-columns; 2. if it's iceberg v2 table and there are some eq-deletes or pos-deletes inside it, then we must guarantee that it is projecting the columns to exclude the extra meta columns. ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java ########## @@ -70,9 +71,17 @@ public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema, PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); - return deletes - .filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)) - .iterator(); + CloseableIterable<RowData> iterable = deletes.filter( + newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor) + ); + + // Project the RowData to remove the extra meta columns. + if (!projectedSchema.sameSchema(deletes.requiredSchema())) { + RowDataProjection rowDataProjection = RowDataProjection.create(deletes.requiredSchema(), projectedSchema); Review comment: I see the `RowDataProjection#create` does a `FlinkSchemaUtil.convert(schema)` for the required schema to project, and I believe the FlinkDeleteFilter also did the same thing inside. I think we can reuse the converted flink row type between them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
