szehon-ho commented on code in PR #4382:
URL: https://github.com/apache/iceberg/pull/4382#discussion_r842173517


##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java:
##########
@@ -181,6 +181,94 @@ public void testPartitionedTable() throws Exception {
     TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), 
expectedFiles.get(1), actualFiles.get(1));
   }
 
+  @Test
+  public void testAllFiles() throws Exception {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" 
+
+        "('format-version'='2', 'write.delete.mode'='merge-on-read')", 
tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, "d")
+    );
+    spark.createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
+    Assert.assertEquals("Should have 2 data files", 1, 
expectedDataManifests.size());

Review Comment:
   Good catch



##########
core/src/main/java/org/apache/iceberg/AllDataFilesTable.java:
##########
@@ -108,30 +85,15 @@ public TableScan asOfTime(long timestampMillis) {
     }
 
     @Override
-    protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
-      CloseableIterable<ManifestFile> manifests = allDataManifestFiles(
-          ops.current().snapshots(), context().planExecutor());
-      String schemaString = SchemaParser.toJson(schema());
-      String specString = 
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : 
rowFilter;
-      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
-
-      return CloseableIterable.transform(manifests, manifest ->
-          new ManifestReadTask(ops.io(), ops.current().specsById(), manifest, 
schema(),
-              schemaString, specString, residuals));
-    }
-  }
-
-  private static CloseableIterable<ManifestFile> allDataManifestFiles(
-      List<Snapshot> snapshots, ExecutorService workerPool) {
-    try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
-        Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () 
-> snapshot.dataManifests().iterator()),
-        workerPool)) {
-      return CloseableIterable.withNoopClose(Sets.newHashSet(iterable));
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to close parallel iterable");
+    protected CloseableIterable<ManifestFile> manifests() {
+      try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
+          Iterables.transform(table().snapshots(),
+              snapshot -> (Iterable<ManifestFile>) () -> 
snapshot.dataManifests().iterator()),

Review Comment:
   Yep, just copied from original code, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to