rdblue commented on code in PR #14533:
URL: https://github.com/apache/iceberg/pull/14533#discussion_r2540058960


##########
core/src/main/java/org/apache/iceberg/V4ManifestReader.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Reader for V4 manifest files containing TrackedFile entries.
+ *
+ * <p>Supports reading both root manifests and leaf manifests. Returns 
TrackedFile entries which can
+ * represent data files, delete files, or manifest references. TODO: implement 
caching.
+ */
+class V4ManifestReader extends CloseableGroup implements 
CloseableIterable<TrackedFile<?>> {
+  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+
+  private final InputFile file;
+  private final InheritableTrackedMetadata inheritableMetadata;
+  private final Long manifestFirstRowId;
+
+  private Collection<String> columns = null;
+  private TrackedFile<?> manifestDV = null;
+
+  protected V4ManifestReader(
+      InputFile file, InheritableTrackedMetadata inheritableMetadata, Long 
manifestFirstRowId) {
+    this.file = file;
+    this.inheritableMetadata = inheritableMetadata;
+    this.manifestFirstRowId = manifestFirstRowId;
+  }
+
+  public V4ManifestReader select(Collection<String> newColumns) {
+    this.columns = newColumns;
+    return this;
+  }
+
+  public V4ManifestReader withDeletionVector(TrackedFile<?> dv) {
+    this.manifestDV = dv;
+    return this;
+  }
+
+  public CloseableIterable<TrackedFile<?>> entries() {
+    return entries(false);
+  }
+
+  public CloseableIterable<TrackedFile<?>> liveEntries() {
+    return entries(true);
+  }
+
+  private CloseableIterable<TrackedFile<?>> entries(boolean onlyLive) {
+    CloseableIterable<TrackedFile<?>> entries = open(columns);
+    return onlyLive ? filterLiveEntries(entries) : entries;
+  }
+
+  private CloseableIterable<TrackedFile<?>> open(Collection<String> cols) {
+    Schema projection = buildProjection(cols);
+
+    FileFormat format = FileFormat.fromFileName(file.location());
+
+    CloseableIterable<GenericTrackedFile> entries =
+        InternalData.read(format, file)
+            .project(projection)
+            .setRootType(GenericTrackedFile.class)
+            .build();
+
+    addCloseable(entries);
+
+    CloseableIterable<TrackedFile<?>> transformed =
+        CloseableIterable.transform(entries, inheritableMetadata::apply);
+
+    transformed = CloseableIterable.transform(transformed, 
rowIdAssigner(manifestFirstRowId));
+
+    transformed = assignPositions(transformed);
+
+    if (manifestDV != null) {
+      RoaringBitmap deletedPositions = deserializeManifestDV(manifestDV);
+      transformed =
+          CloseableIterable.filter(
+              transformed,
+              entry -> {
+                Long pos = entry.pos();
+                // positions are 0-based and should not exceed 
Integer.MAX_VALUE
+                return pos == null || 
!deletedPositions.contains(pos.intValue());
+              });
+    }
+
+    return transformed;
+  }
+
+  private static RoaringBitmap deserializeManifestDV(TrackedFile<?> 
manifestDV) {
+    Preconditions.checkArgument(
+        manifestDV.contentType() == FileContent.MANIFEST_DV,
+        "Expected MANIFEST_DV, got: %s",
+        manifestDV.contentType());
+
+    DeletionVector dvInfo = manifestDV.deletionVector();
+    Preconditions.checkNotNull(dvInfo, "MANIFEST_DV must have 
deletion_vector");
+
+    Preconditions.checkNotNull(
+        dvInfo.inlineContent(),
+        "Manifest DV must have inline content (External not supported): %s",
+        manifestDV.referencedFile());
+
+    ByteBuffer buffer = dvInfo.inlineContent();
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.asReadOnlyBuffer().get(bytes);
+
+    RoaringBitmap bitmap = new RoaringBitmap();
+    try {
+      bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to deserialize Roaring bitmap 
from manifest DV");
+    }
+    return bitmap;
+  }
+
+  private Schema buildProjection(Collection<String> cols) {
+    if (cols == null || cols.containsAll(ALL_COLUMNS)) {
+      return new Schema(GenericTrackedFile.BASE_TYPE.fields());
+    }
+
+    List<Types.NestedField> fields = Lists.newArrayList();
+    for (String column : cols) {
+      Types.NestedField field = GenericTrackedFile.BASE_TYPE.field(column);
+      if (field != null) {
+        fields.add(field);
+      }
+    }
+
+    return new Schema(fields);
+  }
+
+  private CloseableIterable<TrackedFile<?>> filterLiveEntries(
+      CloseableIterable<TrackedFile<?>> entries) {
+    return CloseableIterable.filter(
+        entries,
+        entry -> {
+          TrackingInfo tracking = entry.trackingInfo();
+          return tracking == null || tracking.status() != 
TrackingInfo.Status.DELETED;
+        });
+  }
+
+  private static Function<TrackedFile<?>, TrackedFile<?>> rowIdAssigner(Long 
firstRowId) {
+    if (firstRowId == null) {
+      return entry -> entry;
+    }
+
+    return new Function<>() {
+      private long nextRowId = firstRowId;
+
+      @Override
+      public TrackedFile<?> apply(TrackedFile<?> entry) {
+        if (entry.contentType() == FileContent.DATA) {

Review Comment:
   I think the logic here is mostly correct, but can be improved. I agree that 
`first_row_id` is only assigned if the file is a data file. It should also only 
be assigned in v4 if this is a leaf manifest -- so we probably don't even want 
to run an ID assigner on the root manifest because the root be written with 
explicit `first_row_id` values, just like the manifest list is in v3.
   
   The main issue that I see here is that this only assigns `first_row_id` if 
the tracking info is non-null. There should be no case where tracking info is 
null. For any file, the only time tracking info should be null is when it is 
not yet written to a manifest or is to be written to a manifest (and so 
tracking info will be supressed/overwritten). There's no case where it should 
be null in this class because this is reading from a manifest.
   
   The only exception to that (the paragraph just above) is when reading with a 
custom projection schema -- meaning that the file is being read with a 
projection through a metadata table. For example:
   
   ```sql
   SELECT location, tracking_info.first_row_id FROM db.table.data_files
   ```
   
   In cases like this, we need to ensure that the necessary information to 
produce `first_row_id` is present. In the current reader, we _always_ project 
`first_row_id` and I would suggest always tracking the entirety of 
`tracking_info` so that we know problems from the wrong `first_row_id` will not 
be possible. We can always project more fields than requested, so I'd just make 
sure all fields of `tracking_info` are projected any time this reader is used.



##########
core/src/main/java/org/apache/iceberg/V4ManifestReader.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Reader for V4 manifest files containing TrackedFile entries.
+ *
+ * <p>Supports reading both root manifests and leaf manifests. Returns 
TrackedFile entries which can
+ * represent data files, delete files, or manifest references. TODO: implement 
caching.
+ */
+class V4ManifestReader extends CloseableGroup implements 
CloseableIterable<TrackedFile<?>> {
+  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+
+  private final InputFile file;
+  private final InheritableTrackedMetadata inheritableMetadata;
+  private final Long manifestFirstRowId;
+
+  private Collection<String> columns = null;
+  private TrackedFile<?> manifestDV = null;
+
+  protected V4ManifestReader(
+      InputFile file, InheritableTrackedMetadata inheritableMetadata, Long 
manifestFirstRowId) {
+    this.file = file;
+    this.inheritableMetadata = inheritableMetadata;
+    this.manifestFirstRowId = manifestFirstRowId;
+  }
+
+  public V4ManifestReader select(Collection<String> newColumns) {
+    this.columns = newColumns;
+    return this;
+  }
+
+  public V4ManifestReader withDeletionVector(TrackedFile<?> dv) {
+    this.manifestDV = dv;
+    return this;
+  }
+
+  public CloseableIterable<TrackedFile<?>> entries() {
+    return entries(false);
+  }
+
+  public CloseableIterable<TrackedFile<?>> liveEntries() {
+    return entries(true);
+  }
+
+  private CloseableIterable<TrackedFile<?>> entries(boolean onlyLive) {
+    CloseableIterable<TrackedFile<?>> entries = open(columns);
+    return onlyLive ? filterLiveEntries(entries) : entries;
+  }
+
+  private CloseableIterable<TrackedFile<?>> open(Collection<String> cols) {
+    Schema projection = buildProjection(cols);
+
+    FileFormat format = FileFormat.fromFileName(file.location());
+
+    CloseableIterable<GenericTrackedFile> entries =
+        InternalData.read(format, file)
+            .project(projection)
+            .setRootType(GenericTrackedFile.class)
+            .build();
+
+    addCloseable(entries);
+
+    CloseableIterable<TrackedFile<?>> transformed =
+        CloseableIterable.transform(entries, inheritableMetadata::apply);
+
+    transformed = CloseableIterable.transform(transformed, 
rowIdAssigner(manifestFirstRowId));
+
+    transformed = assignPositions(transformed);
+
+    if (manifestDV != null) {
+      RoaringBitmap deletedPositions = deserializeManifestDV(manifestDV);
+      transformed =
+          CloseableIterable.filter(
+              transformed,
+              entry -> {
+                Long pos = entry.pos();
+                // positions are 0-based and should not exceed 
Integer.MAX_VALUE
+                return pos == null || 
!deletedPositions.contains(pos.intValue());
+              });
+    }
+
+    return transformed;
+  }
+
+  private static RoaringBitmap deserializeManifestDV(TrackedFile<?> 
manifestDV) {
+    Preconditions.checkArgument(
+        manifestDV.contentType() == FileContent.MANIFEST_DV,
+        "Expected MANIFEST_DV, got: %s",
+        manifestDV.contentType());
+
+    DeletionVector dvInfo = manifestDV.deletionVector();
+    Preconditions.checkNotNull(dvInfo, "MANIFEST_DV must have 
deletion_vector");
+
+    Preconditions.checkNotNull(
+        dvInfo.inlineContent(),
+        "Manifest DV must have inline content (External not supported): %s",
+        manifestDV.referencedFile());
+
+    ByteBuffer buffer = dvInfo.inlineContent();
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.asReadOnlyBuffer().get(bytes);
+
+    RoaringBitmap bitmap = new RoaringBitmap();
+    try {
+      bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to deserialize Roaring bitmap 
from manifest DV");

Review Comment:
   Prefer the built-in `UncheckedIOException` to `RuntimeIOException`.



##########
core/src/main/java/org/apache/iceberg/V4ManifestReader.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Reader for V4 manifest files containing TrackedFile entries.
+ *
+ * <p>Supports reading both root manifests and leaf manifests. Returns 
TrackedFile entries which can
+ * represent data files, delete files, or manifest references. TODO: implement 
caching.
+ */
+class V4ManifestReader extends CloseableGroup implements 
CloseableIterable<TrackedFile<?>> {
+  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+
+  private final InputFile file;
+  private final InheritableTrackedMetadata inheritableMetadata;
+  private final Long manifestFirstRowId;
+
+  private Collection<String> columns = null;
+  private TrackedFile<?> manifestDV = null;
+
+  protected V4ManifestReader(
+      InputFile file, InheritableTrackedMetadata inheritableMetadata, Long 
manifestFirstRowId) {
+    this.file = file;
+    this.inheritableMetadata = inheritableMetadata;
+    this.manifestFirstRowId = manifestFirstRowId;
+  }
+
+  public V4ManifestReader select(Collection<String> newColumns) {
+    this.columns = newColumns;
+    return this;
+  }
+
+  public V4ManifestReader withDeletionVector(TrackedFile<?> dv) {
+    this.manifestDV = dv;
+    return this;
+  }
+
+  public CloseableIterable<TrackedFile<?>> entries() {
+    return entries(false);
+  }
+
+  public CloseableIterable<TrackedFile<?>> liveEntries() {
+    return entries(true);
+  }
+
+  private CloseableIterable<TrackedFile<?>> entries(boolean onlyLive) {
+    CloseableIterable<TrackedFile<?>> entries = open(columns);
+    return onlyLive ? filterLiveEntries(entries) : entries;
+  }
+
+  private CloseableIterable<TrackedFile<?>> open(Collection<String> cols) {
+    Schema projection = buildProjection(cols);
+
+    FileFormat format = FileFormat.fromFileName(file.location());
+
+    CloseableIterable<GenericTrackedFile> entries =
+        InternalData.read(format, file)
+            .project(projection)
+            .setRootType(GenericTrackedFile.class)
+            .build();
+
+    addCloseable(entries);
+
+    CloseableIterable<TrackedFile<?>> transformed =
+        CloseableIterable.transform(entries, inheritableMetadata::apply);
+
+    transformed = CloseableIterable.transform(transformed, 
rowIdAssigner(manifestFirstRowId));
+
+    transformed = assignPositions(transformed);
+
+    if (manifestDV != null) {
+      RoaringBitmap deletedPositions = deserializeManifestDV(manifestDV);
+      transformed =
+          CloseableIterable.filter(
+              transformed,
+              entry -> {
+                Long pos = entry.pos();
+                // positions are 0-based and should not exceed 
Integer.MAX_VALUE
+                return pos == null || 
!deletedPositions.contains(pos.intValue());
+              });
+    }
+
+    return transformed;
+  }
+
+  private static RoaringBitmap deserializeManifestDV(TrackedFile<?> 
manifestDV) {
+    Preconditions.checkArgument(
+        manifestDV.contentType() == FileContent.MANIFEST_DV,
+        "Expected MANIFEST_DV, got: %s",
+        manifestDV.contentType());
+
+    DeletionVector dvInfo = manifestDV.deletionVector();
+    Preconditions.checkNotNull(dvInfo, "MANIFEST_DV must have 
deletion_vector");
+
+    Preconditions.checkNotNull(
+        dvInfo.inlineContent(),
+        "Manifest DV must have inline content (External not supported): %s",
+        manifestDV.referencedFile());
+
+    ByteBuffer buffer = dvInfo.inlineContent();
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.asReadOnlyBuffer().get(bytes);

Review Comment:
   Use `ByteBuffers.toByteArray` instead of copying directly.



##########
core/src/main/java/org/apache/iceberg/V4ManifestReader.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Reader for V4 manifest files containing TrackedFile entries.
+ *
+ * <p>Supports reading both root manifests and leaf manifests. Returns 
TrackedFile entries which can
+ * represent data files, delete files, or manifest references. TODO: implement 
caching.
+ */
+class V4ManifestReader extends CloseableGroup implements 
CloseableIterable<TrackedFile<?>> {
+  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+
+  private final InputFile file;
+  private final InheritableTrackedMetadata inheritableMetadata;
+  private final Long manifestFirstRowId;
+
+  private Collection<String> columns = null;
+  private TrackedFile<?> manifestDV = null;
+
+  protected V4ManifestReader(
+      InputFile file, InheritableTrackedMetadata inheritableMetadata, Long 
manifestFirstRowId) {
+    this.file = file;
+    this.inheritableMetadata = inheritableMetadata;
+    this.manifestFirstRowId = manifestFirstRowId;
+  }
+
+  public V4ManifestReader select(Collection<String> newColumns) {
+    this.columns = newColumns;
+    return this;
+  }
+
+  public V4ManifestReader withDeletionVector(TrackedFile<?> dv) {
+    this.manifestDV = dv;
+    return this;
+  }
+
+  public CloseableIterable<TrackedFile<?>> entries() {
+    return entries(false);
+  }
+
+  public CloseableIterable<TrackedFile<?>> liveEntries() {
+    return entries(true);
+  }
+
+  private CloseableIterable<TrackedFile<?>> entries(boolean onlyLive) {
+    CloseableIterable<TrackedFile<?>> entries = open(columns);
+    return onlyLive ? filterLiveEntries(entries) : entries;
+  }
+
+  private CloseableIterable<TrackedFile<?>> open(Collection<String> cols) {
+    Schema projection = buildProjection(cols);
+
+    FileFormat format = FileFormat.fromFileName(file.location());
+
+    CloseableIterable<GenericTrackedFile> entries =
+        InternalData.read(format, file)
+            .project(projection)
+            .setRootType(GenericTrackedFile.class)
+            .build();
+
+    addCloseable(entries);
+
+    CloseableIterable<TrackedFile<?>> transformed =
+        CloseableIterable.transform(entries, inheritableMetadata::apply);
+
+    transformed = CloseableIterable.transform(transformed, 
rowIdAssigner(manifestFirstRowId));
+
+    transformed = assignPositions(transformed);
+
+    if (manifestDV != null) {
+      RoaringBitmap deletedPositions = deserializeManifestDV(manifestDV);
+      transformed =
+          CloseableIterable.filter(
+              transformed,
+              entry -> {
+                Long pos = entry.pos();
+                // positions are 0-based and should not exceed 
Integer.MAX_VALUE
+                return pos == null || 
!deletedPositions.contains(pos.intValue());
+              });
+    }
+
+    return transformed;
+  }
+
+  private static RoaringBitmap deserializeManifestDV(TrackedFile<?> 
manifestDV) {
+    Preconditions.checkArgument(
+        manifestDV.contentType() == FileContent.MANIFEST_DV,
+        "Expected MANIFEST_DV, got: %s",
+        manifestDV.contentType());
+
+    DeletionVector dvInfo = manifestDV.deletionVector();
+    Preconditions.checkNotNull(dvInfo, "MANIFEST_DV must have 
deletion_vector");
+
+    Preconditions.checkNotNull(
+        dvInfo.inlineContent(),
+        "Manifest DV must have inline content (External not supported): %s",
+        manifestDV.referencedFile());
+
+    ByteBuffer buffer = dvInfo.inlineContent();
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.asReadOnlyBuffer().get(bytes);
+
+    RoaringBitmap bitmap = new RoaringBitmap();
+    try {
+      bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to deserialize Roaring bitmap 
from manifest DV");
+    }
+    return bitmap;
+  }
+
+  private Schema buildProjection(Collection<String> cols) {
+    if (cols == null || cols.containsAll(ALL_COLUMNS)) {
+      return new Schema(GenericTrackedFile.BASE_TYPE.fields());
+    }
+
+    List<Types.NestedField> fields = Lists.newArrayList();
+    for (String column : cols) {
+      Types.NestedField field = GenericTrackedFile.BASE_TYPE.field(column);
+      if (field != null) {
+        fields.add(field);
+      }
+    }
+
+    return new Schema(fields);
+  }
+
+  private CloseableIterable<TrackedFile<?>> filterLiveEntries(
+      CloseableIterable<TrackedFile<?>> entries) {
+    return CloseableIterable.filter(
+        entries,
+        entry -> {
+          TrackingInfo tracking = entry.trackingInfo();
+          return tracking == null || tracking.status() != 
TrackingInfo.Status.DELETED;
+        });
+  }
+
+  private static Function<TrackedFile<?>, TrackedFile<?>> rowIdAssigner(Long 
firstRowId) {
+    if (firstRowId == null) {
+      return entry -> entry;
+    }
+
+    return new Function<>() {
+      private long nextRowId = firstRowId;
+
+      @Override
+      public TrackedFile<?> apply(TrackedFile<?> entry) {
+        if (entry.contentType() == FileContent.DATA) {

Review Comment:
   I think the logic here is mostly correct, but can be improved. I agree that 
`first_row_id` is only assigned if the file is a data file. It should also only 
be assigned in v4 if this is a leaf manifest -- so we probably don't even want 
to run an ID assigner on the root manifest because the root be written with 
explicit `first_row_id` values, just like the manifest list is in v3.
   
   The main issue that I see here is that this only assigns `first_row_id` if 
the tracking info is non-null. There should be no case where tracking info is 
null. For any file, the only time tracking info should be null is when it is 
not yet written to a manifest or is to be written to a manifest (and so 
tracking info will be supressed/overwritten). There's no case where it should 
be null in this class because this is reading from a manifest.
   
   The only exception to that (the paragraph just above) is when reading with a 
custom projection schema -- meaning that the file is being read with a 
projection through a metadata table. For example:
   
   ```sql
   SELECT location, tracking_info.first_row_id FROM db.table.data_files
   ```
   
   In cases like this, we need to ensure that the necessary information to 
produce `first_row_id` is present. In the current reader, we _always_ project 
`first_row_id` and I would suggest always tracking the entirety of 
`tracking_info` so that we know problems from the wrong `first_row_id` will not 
be possible. We can always project more fields than requested, so I'd just make 
sure all fields of `tracking_info` are projected any time this reader is used.
   
   If `tracking_info` is assumed to always be present, then this should not 
check whether it is here. Instead, this should assume it is non-null and throw 
an NPE if that assumption is violated.



##########
core/src/main/java/org/apache/iceberg/V4ManifestReader.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Reader for V4 manifest files containing TrackedFile entries.
+ *
+ * <p>Supports reading both root manifests and leaf manifests. Returns 
TrackedFile entries which can
+ * represent data files, delete files, or manifest references. TODO: implement 
caching.
+ */
+class V4ManifestReader extends CloseableGroup implements 
CloseableIterable<TrackedFile<?>> {
+  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+
+  private final InputFile file;
+  private final InheritableTrackedMetadata inheritableMetadata;
+  private final Long manifestFirstRowId;
+
+  private Collection<String> columns = null;
+  private TrackedFile<?> manifestDV = null;
+
+  protected V4ManifestReader(
+      InputFile file, InheritableTrackedMetadata inheritableMetadata, Long 
manifestFirstRowId) {
+    this.file = file;
+    this.inheritableMetadata = inheritableMetadata;
+    this.manifestFirstRowId = manifestFirstRowId;
+  }
+
+  public V4ManifestReader select(Collection<String> newColumns) {
+    this.columns = newColumns;
+    return this;
+  }
+
+  public V4ManifestReader withDeletionVector(TrackedFile<?> dv) {

Review Comment:
   This is fine for now, but if the DV is tracked by the same entry as the 
manifest itself, then this should be passed in through the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to