pvary commented on code in PR #15675:
URL: https://github.com/apache/iceberg/pull/15675#discussion_r2983054999


##########
data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java:
##########
@@ -628,6 +639,513 @@ void 
testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws
         .isInstanceOf(UnsupportedOperationException.class);
   }
 
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnsFilePathAndSpecId(FileFormat fileFormat) throws 
IOException {
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema schema = dataGenerator.schema();
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, schema, genericRecords);
+
+    String filePath = "test-data-file.parquet";
+    int specId = 0;
+    Schema projectionSchema = new Schema(MetadataColumns.FILE_PATH, 
MetadataColumns.SPEC_ID);
+
+    Map<Integer, Object> idToConstant =
+        ImmutableMap.of(
+            MetadataColumns.FILE_PATH.fieldId(), filePath,
+            MetadataColumns.SPEC_ID.fieldId(), specId);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(projectionSchema)
+            .engineProjection(engineSchema(projectionSchema))
+            .idToConstant(convertConstantsToEngine(projectionSchema, 
idToConstant))
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    List<Record> expected =
+        IntStream.range(0, genericRecords.size())
+            .mapToObj(
+                i ->
+                    GenericRecord.create(projectionSchema)
+                        .copy(
+                            MetadataColumns.FILE_PATH.name(), filePath,
+                            MetadataColumns.SPEC_ID.name(), specId))
+            .toList();
+
+    assertThat(readRecords).hasSize(genericRecords.size());
+    assertEquals(projectionSchema, convertToEngineRecords(expected, 
projectionSchema), readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnRowPosition(FileFormat fileFormat) throws 
IOException {
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema schema = dataGenerator.schema();
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, schema, genericRecords);
+
+    Schema projectionSchema = new Schema(MetadataColumns.ROW_POSITION);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(projectionSchema)
+            .engineProjection(engineSchema(projectionSchema))
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    List<Record> expected =
+        IntStream.range(0, genericRecords.size())
+            .mapToObj(
+                i ->
+                    GenericRecord.create(projectionSchema)
+                        .copy(MetadataColumns.ROW_POSITION.name(), (long) i))
+            .toList();
+
+    assertThat(readRecords).hasSize(genericRecords.size());
+    assertEquals(projectionSchema, convertToEngineRecords(expected, 
projectionSchema), readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnIsDeleted(FileFormat fileFormat) throws 
IOException {
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema schema = dataGenerator.schema();
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, schema, genericRecords);
+
+    Schema projectionSchema = new Schema(MetadataColumns.IS_DELETED);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(projectionSchema)
+            .engineProjection(engineSchema(projectionSchema))
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    List<Record> expected =
+        IntStream.range(0, genericRecords.size())
+            .mapToObj(
+                i ->
+                    GenericRecord.create(projectionSchema)
+                        .copy(MetadataColumns.IS_DELETED.name(), false))
+            .toList();
+
+    assertThat(readRecords).hasSize(genericRecords.size());
+    assertEquals(projectionSchema, convertToEngineRecords(expected, 
projectionSchema), readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws 
IOException {
+    assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE);
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema schema = dataGenerator.schema();
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, schema, genericRecords);
+
+    long baseRowId = 100L;
+    long fileSeqNumber = 5L;
+    Schema projectionSchema =
+        new Schema(MetadataColumns.ROW_ID, 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+
+    Map<Integer, Object> idToConstant =
+        ImmutableMap.of(
+            MetadataColumns.ROW_ID.fieldId(), baseRowId,
+            MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), 
fileSeqNumber);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(projectionSchema)
+            .engineProjection(engineSchema(projectionSchema))
+            .idToConstant(convertConstantsToEngine(projectionSchema, 
idToConstant))
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    List<Record> expected =
+        IntStream.range(0, genericRecords.size())
+            .mapToObj(
+                i ->
+                    GenericRecord.create(projectionSchema)
+                        .copy(
+                            MetadataColumns.ROW_ID.name(),
+                            baseRowId + i,
+                            
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
+                            fileSeqNumber))
+            .toList();
+
+    assertThat(readRecords).hasSize(genericRecords.size());
+    assertEquals(projectionSchema, convertToEngineRecords(expected, 
projectionSchema), readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnRowLinageExistValue(FileFormat fileFormat) throws 
IOException {
+    assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE);
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema dataSchema = dataGenerator.schema();
+
+    Schema writeSchema = MetadataColumns.schemaWithRowLineage(dataSchema);
+
+    List<Record> baseRecords = dataGenerator.generateRecords();
+    List<Record> writeRecords = 
Lists.newArrayListWithExpectedSize(baseRecords.size());
+    for (int i = 0; i < baseRecords.size(); i++) {
+      Record base = baseRecords.get(i);
+      Record rec = GenericRecord.create(writeSchema);
+      for (Types.NestedField col : dataSchema.columns()) {
+        rec.setField(col.name(), base.getField(col.name()));
+      }
+
+      if (i % 2 == 0) {
+        rec.setField(MetadataColumns.ROW_ID.name(), 555L + i);
+        rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 7L);
+      } else {
+        rec.setField(MetadataColumns.ROW_ID.name(), null);
+        rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 
null);
+      }
+
+      writeRecords.add(rec);
+    }
+
+    DataWriter<Record> writer =
+        FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, 
encryptedFile)
+            .schema(writeSchema)
+            .spec(PartitionSpec.unpartitioned())
+            .build();
+
+    try (writer) {
+      writeRecords.forEach(writer::write);
+    }
+
+    long baseRowId = 100L;
+    long fileSeqNumber = 5L;
+    Schema projectionSchema =
+        new Schema(MetadataColumns.ROW_ID, 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+
+    Map<Integer, Object> idToConstant =
+        ImmutableMap.of(
+            MetadataColumns.ROW_ID.fieldId(), baseRowId,
+            MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), 
fileSeqNumber);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(projectionSchema)
+            .engineProjection(engineSchema(projectionSchema))
+            .idToConstant(convertConstantsToEngine(projectionSchema, 
idToConstant))
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    // Expected results:
+    // - Even rows (explicit values): _row_id = 555+i, 
_last_updated_sequence_number = 7
+    // - Odd rows (null values): _row_id = baseRowId+pos, 
_last_updated_sequence_number =
+    // fileSeqNumber
+    List<Record> expected =
+        IntStream.range(0, baseRecords.size())
+            .mapToObj(
+                i -> {
+                  if (i % 2 == 0) {
+                    return GenericRecord.create(projectionSchema)
+                        .copy(
+                            MetadataColumns.ROW_ID.name(),
+                            555L + i,
+                            
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
+                            7L);
+                  } else {
+                    return GenericRecord.create(projectionSchema)
+                        .copy(
+                            MetadataColumns.ROW_ID.name(),
+                            baseRowId + i,
+                            
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
+                            fileSeqNumber);
+                  }
+                })
+            .toList();
+
+    assertThat(readRecords).hasSize(baseRecords.size());
+    assertEquals(projectionSchema, convertToEngineRecords(expected, 
projectionSchema), readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws 
IOException {
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    PartitionSpec spec = 
PartitionSpec.builderFor(dataGenerator.schema()).identity("col_a").build();
+
+    Types.StructType partitionType = spec.partitionType();
+    PartitionData partitionData = new PartitionData(partitionType);
+    partitionData.set(0, "test_col_a");
+
+    DataWriter<Record> writer =
+        FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, 
encryptedFile)
+            .schema(dataGenerator.schema())
+            .spec(PartitionSpec.unpartitioned())
+            .build();
+
+    List<Record> records = dataGenerator.generateRecords();
+    try (writer) {
+      records.forEach(writer::write);
+    }
+
+    Types.NestedField partitionField =
+        Types.NestedField.optional(
+            MetadataColumns.PARTITION_COLUMN_ID,
+            MetadataColumns.PARTITION_COLUMN_NAME,
+            partitionType,
+            MetadataColumns.PARTITION_COLUMN_DOC);
+    Schema projectionSchema = new Schema(partitionField);
+
+    Map<Integer, Object> idToConstant =
+        ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(projectionSchema)
+            .engineProjection(engineSchema(projectionSchema))
+            .idToConstant(convertConstantsToEngine(projectionSchema, 
idToConstant))
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    Record partitionRecord = structLikeToRecord(partitionData, partitionType);
+    List<Record> expected =
+        IntStream.range(0, records.size())
+            .mapToObj(
+                i ->
+                    GenericRecord.create(projectionSchema)
+                        .copy(MetadataColumns.PARTITION_COLUMN_NAME, 
partitionRecord))
+            .toList();
+
+    assertThat(readRecords).hasSize(records.size());
+    assertEquals(projectionSchema, convertToEngineRecords(expected, 
projectionSchema), readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadMetadataColumnPartitionBucketTransform(FileFormat fileFormat) 
throws IOException {

Review Comment:
   Could you help me with highlighting the differences between this test and 
`testReadMetadataColumnPartitionIdentity`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to