aokolnychyi commented on a change in pull request #2984:
URL: https://github.com/apache/iceberg/pull/2984#discussion_r690519631



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
##########
@@ -72,12 +62,7 @@ public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
 
   @Override
   public FlinkInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
-    // Called in Job manager, so it is OK to load table from catalog.
-    tableLoader.open();

Review comment:
       Okay, I've submitted #2987 for Flink changes alone.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
##########
@@ -141,7 +144,7 @@
           // TODO: We do not support residual evaluation for HIVE and PIG in 
memory data model yet
           checkResiduals(task);
         }
-        splits.add(new IcebergSplit(conf, task, table.io(), 
table.encryption()));
+        splits.add(new IcebergSplit(SerializableTable.copyOf(table), conf, 
task));

Review comment:
       Good catch, @kbendick! Moved it outside of the loop in #2988.

##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -38,6 +38,11 @@ private MetadataColumns() {
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position 
of a row in the source data file");
   public static final NestedField IS_DELETED = NestedField.required(
       Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the 
row has been deleted");
+  public static final NestedField SPEC = NestedField.required(
+      Integer.MAX_VALUE - 4, "_spec", Types.IntegerType.get(), "Spec ID to 
which a row belongs to");
+  public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
+  public static final String PARTITION_COLUMN_NAME = "_partition";
+  public static final String PARTITION_COLUMN_DOC = "Partition to which a row 
belongs to";

Review comment:
       Sure, I'll add a comment above.

##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -49,26 +54,40 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
 
   private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
+      SPEC.name(), SPEC,
       FILE_PATH.name(), FILE_PATH,
       ROW_POSITION.name(), ROW_POSITION,
       IS_DELETED.name(), IS_DELETED);
 
-  private static final Set<Integer> META_IDS = 
META_COLUMNS.values().stream().map(NestedField::fieldId)
-      .collect(ImmutableSet.toImmutableSet());
+  private static final Set<Integer> META_IDS = ImmutableSet.of(
+      PARTITION_COLUMN_ID,
+      SPEC.fieldId(),
+      FILE_PATH.fieldId(),
+      ROW_POSITION.fieldId(),
+      IS_DELETED.fieldId()
+  );
 
   public static Set<Integer> metadataFieldIds() {
     return META_IDS;
   }
 
-  public static NestedField get(String name) {
-    return META_COLUMNS.get(name);
+  public static NestedField metadataColumn(Table table, String name) {
+    if (name.equals(PARTITION_COLUMN_NAME)) {
+      return Types.NestedField.optional(
+          PARTITION_COLUMN_ID,
+          PARTITION_COLUMN_NAME,
+          Partitioning.partitionType(table),
+          PARTITION_COLUMN_DOC);
+    } else {
+      return META_COLUMNS.get(name);
+    }
   }
 
   public static boolean isMetadataColumn(String name) {
-    return META_COLUMNS.containsKey(name);
+    return name.equals(PARTITION_COLUMN_NAME) || 
META_COLUMNS.containsKey(name);

Review comment:
       I did that first but `META_COLUMNS` is an immutable map that does not 
allow null keys. I am reluctant to switch to a mutable map so I added this 
condition here. I hope we will be able to use `metadataColumn(table, name)` in 
other places so this workaround will be only part of `MetadataColumns`.
   
   It looks ugly, though, I agree.
   

##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -49,26 +54,40 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
 
   private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
+      SPEC.name(), SPEC,

Review comment:
       I'll switch. Wanted to reduce the number of modified lines but probably 
better to follow the order of definition.

##########
File path: core/src/main/java/org/apache/iceberg/Partitioning.java
##########
@@ -246,4 +246,45 @@ private static boolean 
equivalentIgnoringNames(PartitionField field, PartitionFi
   private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, 
?> t2) {
     return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) || 
t2.equals(Transforms.alwaysNull());
   }
+
+  /**
+   * Adapts the provided partition data to match the table partition type 
built using {@link #partitionType(Table)}.
+   *
+   * @param partitionType a table partition type that includes partition 
fields from all specs

Review comment:
       Yeah, it will be set to null as all partition fields are optional and 
the table partition type is a union of all spec types.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to