aokolnychyi commented on a change in pull request #2984: URL: https://github.com/apache/iceberg/pull/2984#discussion_r690519631
########## File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java ########## @@ -72,12 +62,7 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { @Override public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException { - // Called in Job manager, so it is OK to load table from catalog. - tableLoader.open(); Review comment: Okay, I've submitted #2987 for Flink changes alone. ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java ########## @@ -141,7 +144,7 @@ // TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet checkResiduals(task); } - splits.add(new IcebergSplit(conf, task, table.io(), table.encryption())); + splits.add(new IcebergSplit(SerializableTable.copyOf(table), conf, task)); Review comment: Good catch, @kbendick! Moved it outside of the loop in #2988. ########## File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java ########## @@ -38,6 +38,11 @@ private MetadataColumns() { Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file"); public static final NestedField IS_DELETED = NestedField.required( Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted"); + public static final NestedField SPEC = NestedField.required( + Integer.MAX_VALUE - 4, "_spec", Types.IntegerType.get(), "Spec ID to which a row belongs to"); + public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5; + public static final String PARTITION_COLUMN_NAME = "_partition"; + public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to"; Review comment: Sure, I'll add a comment above. ########## File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java ########## @@ -49,26 +54,40 @@ private MetadataColumns() { public static final String DELETE_FILE_ROW_DOC = "Deleted row values"; private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of( + SPEC.name(), SPEC, FILE_PATH.name(), FILE_PATH, ROW_POSITION.name(), ROW_POSITION, IS_DELETED.name(), IS_DELETED); - private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId) - .collect(ImmutableSet.toImmutableSet()); + private static final Set<Integer> META_IDS = ImmutableSet.of( + PARTITION_COLUMN_ID, + SPEC.fieldId(), + FILE_PATH.fieldId(), + ROW_POSITION.fieldId(), + IS_DELETED.fieldId() + ); public static Set<Integer> metadataFieldIds() { return META_IDS; } - public static NestedField get(String name) { - return META_COLUMNS.get(name); + public static NestedField metadataColumn(Table table, String name) { + if (name.equals(PARTITION_COLUMN_NAME)) { + return Types.NestedField.optional( + PARTITION_COLUMN_ID, + PARTITION_COLUMN_NAME, + Partitioning.partitionType(table), + PARTITION_COLUMN_DOC); + } else { + return META_COLUMNS.get(name); + } } public static boolean isMetadataColumn(String name) { - return META_COLUMNS.containsKey(name); + return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name); Review comment: I did that first but `META_COLUMNS` is an immutable map that does not allow null keys. I am reluctant to switch to a mutable map so I added this condition here. I hope we will be able to use `metadataColumn(table, name)` in other places so this workaround will be only part of `MetadataColumns`. It looks ugly, though, I agree. ########## File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java ########## @@ -49,26 +54,40 @@ private MetadataColumns() { public static final String DELETE_FILE_ROW_DOC = "Deleted row values"; private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of( + SPEC.name(), SPEC, Review comment: I'll switch. Wanted to reduce the number of modified lines but probably better to follow the order of definition. ########## File path: core/src/main/java/org/apache/iceberg/Partitioning.java ########## @@ -246,4 +246,45 @@ private static boolean equivalentIgnoringNames(PartitionField field, PartitionFi private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?> t2) { return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) || t2.equals(Transforms.alwaysNull()); } + + /** + * Adapts the provided partition data to match the table partition type built using {@link #partitionType(Table)}. + * + * @param partitionType a table partition type that includes partition fields from all specs Review comment: Yeah, it will be set to null as all partition fields are optional and the table partition type is a union of all spec types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org