RussellSpitzer commented on code in PR #4479:
URL: https://github.com/apache/iceberg/pull/4479#discussion_r1143864054


##########
core/src/main/java/org/apache/iceberg/MicroBatches.java:
##########
@@ -36,6 +37,96 @@
 public class MicroBatches {
   private MicroBatches() {}
 
+  public static List<Pair<ManifestFile, Integer>> 
skippedManifestIndexesFromSnapshot(
+      FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) 
{
+    //  Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is 
unexpectedly smaller
+    // than 0");
+    List<ManifestFile> manifests =
+        scanAllFiles
+            ? snapshot.dataManifests(io)
+            : snapshot.dataManifests(io).stream()
+                .filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
+                .collect(Collectors.toList());
+
+    List<Pair<ManifestFile, Integer>> manifestIndexes = 
indexManifests(manifests);
+
+    return skipManifests(manifestIndexes, startFileIndex);
+  }
+
+  public static CloseableIterable<FileScanTask> openManifestFile(
+      FileIO io,
+      Map<Integer, PartitionSpec> specsById,
+      boolean caseSensitive,
+      Snapshot snapshot,
+      ManifestFile manifestFile,
+      boolean scanAllFiles) {
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(io, ImmutableList.of(manifestFile))
+            .specsById(specsById)
+            .caseSensitive(caseSensitive);
+    if (!scanAllFiles) {
+      manifestGroup =
+          manifestGroup
+              .filterManifestEntries(
+                  entry ->
+                      entry.snapshotId() == snapshot.snapshotId()
+                          && entry.status() == ManifestEntry.Status.ADDED)
+              .ignoreDeleted();
+    }
+
+    return manifestGroup.planFiles();
+  }
+
+  /**
+   * Method to index the data files for each manifest. For example, if 
manifest m1 has 3 data files,
+   * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index 
will be (m1, 0), (m2,
+   * 3), (m3, 5).
+   *
+   * @param manifestFiles List of input manifests used to index.
+   * @return a list of manifest index with key as manifest file, value as file 
counts.
+   */
+  private static List<Pair<ManifestFile, Integer>> indexManifests(
+      List<ManifestFile> manifestFiles) {
+    int currentFileIndex = 0;
+    List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();
+
+    for (ManifestFile manifest : manifestFiles) {
+      manifestIndexes.add(Pair.of(manifest, currentFileIndex));
+      currentFileIndex += manifest.addedFilesCount() + 
manifest.existingFilesCount();
+    }
+
+    return manifestIndexes;
+  }
+
+  /**
+   * Method to skip the manifest file in which the index is smaller than 
startFileIndex. For
+   * example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and 
startFileIndex is 4, then the
+   * returned manifest index list is: (m2, 3), (m3, 5).
+   *
+   * @param indexedManifests List of input manifests.
+   * @param startFileIndex Index used to skip the processed manifests.

Review Comment:
   Skip all manifests with an index less than or equal to this value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to