This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f412ce5f9 Spark, Flink: Pass FileIO into Snapshot methods that read
metadata, backports (#4877)
f412ce5f9 is described below
commit f412ce5f910c2cbcaf76529ff1194b302fa7acb6
Author: Kyle Bendickson <[email protected]>
AuthorDate: Thu May 26 19:51:49 2022 -0700
Spark, Flink: Pass FileIO into Snapshot methods that read metadata,
backports (#4877)
---
.../apache/iceberg/flink/TestChangeLogTable.java | 2 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 2 +-
.../apache/iceberg/flink/TestChangeLogTable.java | 2 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 2 +-
.../extensions/TestRewriteManifestsProcedure.java | 16 +++++-----
.../actions/BaseRewriteManifestsSparkAction.java | 2 +-
.../spark/source/SparkMicroBatchStream.java | 3 +-
.../spark/actions/TestExpireSnapshotsAction.java | 36 +++++++++++-----------
.../spark/actions/TestRewriteDataFilesAction.java | 11 ++++---
.../spark/actions/TestRewriteManifestsAction.java | 23 +++++++-------
.../iceberg/spark/source/TestDataFrameWrites.java | 2 +-
.../spark/source/TestDataSourceOptions.java | 8 ++---
.../spark/source/TestIcebergSourceTablesBase.java | 33 +++++++++++---------
.../iceberg/spark/source/TestSparkDataFile.java | 2 +-
.../iceberg/spark/source/TestSparkDataWrite.java | 6 ++--
.../apache/iceberg/spark/sql/TestRefreshTable.java | 2 +-
.../extensions/TestRewriteManifestsProcedure.java | 16 +++++-----
.../actions/BaseRewriteManifestsSparkAction.java | 2 +-
.../spark/source/SparkMicroBatchStream.java | 3 +-
.../spark/actions/TestExpireSnapshotsAction.java | 36 +++++++++++-----------
.../spark/actions/TestRewriteDataFilesAction.java | 11 ++++---
.../spark/actions/TestRewriteManifestsAction.java | 23 +++++++-------
.../iceberg/spark/source/TestDataFrameWrites.java | 2 +-
.../spark/source/TestDataSourceOptions.java | 8 ++---
.../spark/source/TestIcebergSourceTablesBase.java | 33 +++++++++++---------
.../iceberg/spark/source/TestSparkDataFile.java | 2 +-
.../iceberg/spark/source/TestSparkDataWrite.java | 6 ++--
.../apache/iceberg/spark/sql/TestRefreshTable.java | 2 +-
28 files changed, 154 insertions(+), 142 deletions(-)
diff --git
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 68b706e2d..5c04c8551 100644
---
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -296,7 +296,7 @@ public class TestChangeLogTable extends
ChangeLogTableTestBase {
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId()
== m.snapshotId())) {
+ if (snapshot.allManifests(table.io()).stream().anyMatch(m ->
snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
diff --git
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 23169d1a5..97506b90b 100644
---
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -147,7 +147,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId()
== m.snapshotId())) {
+ if (snapshot.allManifests(table.io()).stream().anyMatch(m ->
snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 68b706e2d..5c04c8551 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -296,7 +296,7 @@ public class TestChangeLogTable extends
ChangeLogTableTestBase {
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId()
== m.snapshotId())) {
+ if (snapshot.allManifests(table.io()).stream().anyMatch(m ->
snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 23169d1a5..97506b90b 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -147,7 +147,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId()
== m.snapshotId())) {
+ if (snapshot.allManifests(table.io()).stream().anyMatch(m ->
snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
diff --git
a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index b04f17693..dcf0a2d91 100644
---
a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++
b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -60,7 +60,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 1 manifest", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifest", 1,
table.currentSnapshot().allManifests(table.io()).size());
sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes'
'1')", tableName);
@@ -72,7 +72,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 4 manifests", 4,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 4 manifests", 4,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
@@ -88,7 +88,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 4 manifest", 4,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 4 manifest", 4,
table.currentSnapshot().allManifests(table.io()).size());
List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(table => '%s')", catalogName,
tableIdent);
@@ -98,7 +98,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
@@ -110,7 +110,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests(table.io()).size());
List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(use_caching => false, table =>
'%s')", catalogName, tableIdent);
@@ -120,7 +120,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
@@ -132,7 +132,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests(table.io()).size());
List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe =>
'%s')", catalogName, tableIdent);
@@ -142,7 +142,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
diff --git
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c446d42ca..b1769f428 100644
---
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -261,7 +261,7 @@ public class BaseRewriteManifestsSparkAction
return ImmutableList.of();
}
- return currentSnapshot.dataManifests().stream()
+ return currentSnapshot.dataManifests(fileIO).stream()
.filter(manifest -> manifest.partitionSpecId() == spec.specId() &&
predicate.test(manifest))
.collect(Collectors.toList());
}
diff --git
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index dd68022c6..d72928b6b 100644
---
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -111,7 +111,8 @@ public class SparkMicroBatchStream implements
MicroBatchStream {
}
Snapshot latestSnapshot = table.currentSnapshot();
- return new StreamingOffset(latestSnapshot.snapshotId(),
Iterables.size(latestSnapshot.addedFiles()), false);
+ return new StreamingOffset(
+ latestSnapshot.snapshotId(),
Iterables.size(latestSnapshot.addedFiles(table.io())), false);
}
@Override
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index c4445e954..9b8c9d250 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -570,13 +570,13 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
expectedDeletes.add(snapshotA.manifestListLocation());
// Files should be deleted of dangling staged snapshot
- snapshotB.addedFiles().forEach(i -> {
+ snapshotB.addedFiles(table.io()).forEach(i -> {
expectedDeletes.add(i.path().toString());
});
// ManifestList should be deleted too
expectedDeletes.add(snapshotB.manifestListLocation());
- snapshotB.dataManifests().forEach(file -> {
+ snapshotB.dataManifests(table.io()).forEach(file -> {
// Only the manifest of B should be deleted.
if (file.snapshotId() == snapshotB.snapshotId()) {
expectedDeletes.add(file.path());
@@ -645,7 +645,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
// Make sure no dataFiles are deleted for the B, C, D snapshot
Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
- i.addedFiles().forEach(item -> {
+ i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
@@ -700,7 +700,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
// Make sure no dataFiles are deleted for the staged snapshot
Lists.newArrayList(snapshotB).forEach(i -> {
- i.addedFiles().forEach(item -> {
+ i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
@@ -714,7 +714,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
// Make sure no dataFiles are deleted for the staged and cherry-pick
Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
- i.addedFiles().forEach(item -> {
+ i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
@@ -762,7 +762,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -772,7 +772,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create replace manifest with a rewritten
manifest",
- 1, secondSnapshot.allManifests().size());
+ 1, secondSnapshot.allManifests(table.io()).size());
table.newAppend()
.appendFile(FILE_B)
@@ -798,9 +798,9 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Assert.assertEquals("Should remove expired manifest lists and deleted data
file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
- firstSnapshot.allManifests().get(0).path(), // manifest was
rewritten for delete
+ firstSnapshot.allManifests(table.io()).get(0).path(), // manifest
was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
- secondSnapshot.allManifests().get(0).path(), // manifest contained
only deletes, was dropped
+ secondSnapshot.allManifests(table.io()).get(0).path(), // manifest
contained only deletes, was dropped
FILE_A.path()), // deleted
deletedFiles);
@@ -821,7 +821,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -831,7 +831,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should replace manifest with a rewritten manifest",
- 1, secondSnapshot.allManifests().size());
+ 1, secondSnapshot.allManifests(table.io()).size());
table.newFastAppend() // do not merge to keep the last snapshot's manifest
valid
.appendFile(FILE_C)
@@ -857,7 +857,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Assert.assertEquals("Should remove expired manifest lists and deleted data
file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
- firstSnapshot.allManifests().get(0).path(), // manifest was
rewritten for delete
+ firstSnapshot.allManifests(table.io()).get(0).path(), // manifest
was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
FILE_A.path()), // deleted
deletedFiles);
@@ -879,7 +879,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -888,8 +888,8 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
- Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests());
- secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+ Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+ secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
Assert.assertEquals("Should add one new manifest for append", 1,
secondSnapshotManifests.size());
table.manageSnapshots()
@@ -928,7 +928,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -937,8 +937,8 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
- Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests());
- secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+ Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+ secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
Assert.assertEquals("Should add one new manifest for append", 1,
secondSnapshotManifests.size());
table.manageSnapshots()
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 3f465fe72..9d4602765 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -269,14 +269,14 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertEquals(
"Data manifest should not have existing data file",
0,
- (long)
table.currentSnapshot().dataManifests().get(0).existingFilesCount());
+ (long)
table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
Assert.assertEquals("Data manifest should have 1 delete data file",
1L,
- (long)
table.currentSnapshot().dataManifests().get(0).deletedFilesCount());
+ (long)
table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
Assert.assertEquals(
"Delete manifest added row count should equal total count",
total,
- (long)
table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
+ (long)
table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
}
@Test
@@ -962,7 +962,8 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
- Assert.assertTrue("Should have written 40+ files",
Iterables.size(table.currentSnapshot().addedFiles()) >= 40);
+ Assert.assertTrue("Should have written 40+ files",
+ Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40);
table.refresh();
@@ -1223,7 +1224,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
NestedField field = table.schema().caseInsensitiveFindField(column);
Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
- Map<StructLike, List<DataFile>> filesByPartition =
Streams.stream(table.currentSnapshot().addedFiles())
+ Map<StructLike, List<DataFile>> filesByPartition =
Streams.stream(table.currentSnapshot().addedFiles(table.io()))
.collect(Collectors.groupingBy(DataFile::partition));
Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 40adb7d4c..f30251e74 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -129,7 +129,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -143,7 +143,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -183,7 +183,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -207,7 +207,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
// table should reflect the changes, since the commit was successful
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -262,7 +262,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 4 manifests before rewrite", 4,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -284,7 +284,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -347,7 +347,8 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.stagingLocation(temp.newFolder().toString())
.execute();
- Assert.assertEquals("Action should rewrite all manifests",
snapshot.allManifests(), result.rewrittenManifests());
+ Assert.assertEquals("Action should rewrite all manifests",
+ snapshot.allManifests(table.io()), result.rewrittenManifests());
Assert.assertEquals("Action should add 1 manifest", 1,
Iterables.size(result.addedManifests()));
} finally {
@@ -375,7 +376,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests before rewrite", 1,
manifests.size());
// set the target manifest size to a small value to force splitting
records into multiple files
@@ -395,7 +396,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
@@ -430,7 +431,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -447,7 +448,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
Assert.assertFalse("First manifest must be rewritten",
newManifests.contains(manifests.get(0)));
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 139136282..f6d292f89 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -181,7 +181,7 @@ public class TestDataFrameWrites extends AvroDataTest {
}
Assert.assertEquals("Both iterators should be exhausted",
expectedIter.hasNext(), actualIter.hasNext());
- table.currentSnapshot().addedFiles().forEach(dataFile ->
+ table.currentSnapshot().addedFiles(table.io()).forEach(dataFile ->
Assert.assertTrue(
String.format(
"File should have the parent directory %s, but has: %s.",
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 7655b4b82..55605288b 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -203,7 +203,7 @@ public class TestDataSourceOptions {
.mode("append")
.save(tableLocation);
- List<DataFile> files =
Lists.newArrayList(icebergTable.currentSnapshot().addedFiles());
+ List<DataFile> files =
Lists.newArrayList(icebergTable.currentSnapshot().addedFiles(icebergTable.io()));
Assert.assertEquals("Should have written 1 file", 1, files.size());
long fileSize = files.get(0).fileSizeInBytes();
@@ -327,7 +327,7 @@ public class TestDataSourceOptions {
.mode("append")
.save(tableLocation);
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Must be 2 manifests", 2, manifests.size());
@@ -356,7 +356,7 @@ public class TestDataSourceOptions {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
- tables.create(SCHEMA, spec, options, tableLocation);
+ Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation);
List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
@@ -371,7 +371,7 @@ public class TestDataSourceOptions {
int splitSize = (int) TableProperties.METADATA_SPLIT_SIZE_DEFAULT; // 32MB
split size
int expectedSplits = ((int) tables.load(tableLocation + "#entries")
- .currentSnapshot().allManifests().get(0).length() + splitSize - 1) /
splitSize;
+ .currentSnapshot().allManifests(icebergTable.io()).get(0).length() +
splitSize - 1) / splitSize;
Dataset<Row> metadataDf = spark.read()
.format("iceberg")
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ea6aaae53..d6f65af4d 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -148,9 +148,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Snapshot snapshot = table.currentSnapshot();
- Assert.assertEquals("Should only contain one manifest", 1,
snapshot.allManifests().size());
+ Assert.assertEquals("Should only contain one manifest", 1,
snapshot.allManifests(table.io()).size());
- InputFile manifest =
table.io().newInputFile(snapshot.allManifests().get(0).path());
+ InputFile manifest =
table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path());
List<GenericData.Record> expected = Lists.newArrayList();
try (CloseableIterable<GenericData.Record> rows =
Avro.read(manifest).project(entriesTable.schema()).build()) {
// each row must inherit snapshot_id and sequence_number
@@ -206,7 +206,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
List<Object[]> singleActual = rowsToJava(spark.read()
.format("iceberg")
@@ -233,7 +233,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
List<Object[]> multiActual = rowsToJava(spark.read()
.format("iceberg")
@@ -261,7 +261,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
List<Object[]> multiActual = rowsToJava(spark.read()
.format("iceberg")
@@ -308,7 +308,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest :
Iterables.concat(Iterables.transform(table.snapshots(),
Snapshot::allManifests))) {
+ for (ManifestFile manifest : Iterables.concat(
+ Iterables.transform(table.snapshots(), s ->
s.allManifests(table.io())))) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
// each row must inherit snapshot_id and sequence_number
@@ -384,7 +385,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -441,7 +442,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -530,7 +531,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile toDelete =
Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+ DataFile toDelete =
Iterables.getOnlyElement(table.currentSnapshot().addedFiles(table.io()));
// add a second file
df2.select("id", "data").write()
@@ -547,7 +548,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -644,7 +645,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest :
Iterables.concat(Iterables.transform(table.snapshots(),
Snapshot::dataManifests))) {
+ Iterable<ManifestFile> dataManifests =
Iterables.concat(Iterables.transform(table.snapshots(),
+ snapshot -> snapshot.dataManifests(table.io())));
+ for (ManifestFile manifest : dataManifests) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -897,7 +900,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
manifestTable.schema(), "manifests"));
GenericRecordBuilder summaryBuilder = new
GenericRecordBuilder(AvroSchemaUtil.convert(
manifestTable.schema().findType("partition_summaries.element").asStructType(),
"partition_summary"));
- List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+ List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
builder
.set("content", manifest.content().id())
.set("path", manifest.path())
@@ -967,7 +970,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
GenericRecordBuilder builder = new
GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct()));
GenericRecordBuilder summaryBuilder = new
GenericRecordBuilder(AvroSchemaUtil.convert(
projectedSchema.findType("partition_summaries.element").asStructType(),
"partition_summary"));
- List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+ List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
builder.set("partition_spec_id", manifest.partitionSpecId())
.set("path", manifest.path())
.set("partition_summaries", Lists.transform(manifest.partitions(),
partition ->
@@ -999,11 +1002,11 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.mode("append")
.save(loadLocation(tableIdentifier));
- manifests.addAll(table.currentSnapshot().allManifests());
+ manifests.addAll(table.currentSnapshot().allManifests(table.io()));
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
- manifests.addAll(table.currentSnapshot().allManifests());
+ manifests.addAll(table.currentSnapshot().allManifests(table.io()));
List<Row> actual = spark.read()
.format("iceberg")
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index f6ca5f7de..cd1404766 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -156,7 +156,7 @@ public class TestSparkDataFile {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
List<DataFile> dataFiles = Lists.newArrayList();
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 8bf57ba65..5b158c518 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -133,7 +133,7 @@ public class TestSparkDataWrite {
List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(),
actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
- for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
// TODO: avro not support split
if (!format.equals(FileFormat.AVRO)) {
@@ -372,7 +372,7 @@ public class TestSparkDataWrite {
Assert.assertEquals("Result rows should match", expected, actual);
List<DataFile> files = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
@@ -590,7 +590,7 @@ public class TestSparkDataWrite {
Assert.assertEquals("Result rows should match", expected, actual);
List<DataFile> files = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
diff --git
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
index 1cbb4fef0..510d7c40e 100644
---
a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
+++
b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
@@ -66,7 +66,7 @@ public class TestRefreshTable extends SparkCatalogTestBase {
// Modify table outside of spark, it should be cached so Spark should see
the same value after mutation
Table table = validationCatalog.loadTable(tableIdent);
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
table.newDelete().deleteFile(file).commit();
List<Object[]> cachedActual = sql("SELECT * FROM %s", tableName);
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index b04f17693..dcf0a2d91 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -60,7 +60,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 1 manifest", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifest", 1,
table.currentSnapshot().allManifests(table.io()).size());
sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes'
'1')", tableName);
@@ -72,7 +72,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 4 manifests", 4,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 4 manifests", 4,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
@@ -88,7 +88,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 4 manifest", 4,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 4 manifest", 4,
table.currentSnapshot().allManifests(table.io()).size());
List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(table => '%s')", catalogName,
tableIdent);
@@ -98,7 +98,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
@@ -110,7 +110,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests(table.io()).size());
List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(use_caching => false, table =>
'%s')", catalogName, tableIdent);
@@ -120,7 +120,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
@@ -132,7 +132,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 2 manifest", 2,
table.currentSnapshot().allManifests(table.io()).size());
List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe =>
'%s')", catalogName, tableIdent);
@@ -142,7 +142,7 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
table.refresh();
- Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests().size());
+ Assert.assertEquals("Must have 1 manifests", 1,
table.currentSnapshot().allManifests(table.io()).size());
}
@Test
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c446d42ca..b1769f428 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -261,7 +261,7 @@ public class BaseRewriteManifestsSparkAction
return ImmutableList.of();
}
- return currentSnapshot.dataManifests().stream()
+ return currentSnapshot.dataManifests(fileIO).stream()
.filter(manifest -> manifest.partitionSpecId() == spec.specId() &&
predicate.test(manifest))
.collect(Collectors.toList());
}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index dd68022c6..d72928b6b 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -111,7 +111,8 @@ public class SparkMicroBatchStream implements
MicroBatchStream {
}
Snapshot latestSnapshot = table.currentSnapshot();
- return new StreamingOffset(latestSnapshot.snapshotId(),
Iterables.size(latestSnapshot.addedFiles()), false);
+ return new StreamingOffset(
+ latestSnapshot.snapshotId(),
Iterables.size(latestSnapshot.addedFiles(table.io())), false);
}
@Override
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index c4445e954..9b8c9d250 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -570,13 +570,13 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
expectedDeletes.add(snapshotA.manifestListLocation());
// Files should be deleted of dangling staged snapshot
- snapshotB.addedFiles().forEach(i -> {
+ snapshotB.addedFiles(table.io()).forEach(i -> {
expectedDeletes.add(i.path().toString());
});
// ManifestList should be deleted too
expectedDeletes.add(snapshotB.manifestListLocation());
- snapshotB.dataManifests().forEach(file -> {
+ snapshotB.dataManifests(table.io()).forEach(file -> {
// Only the manifest of B should be deleted.
if (file.snapshotId() == snapshotB.snapshotId()) {
expectedDeletes.add(file.path());
@@ -645,7 +645,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
// Make sure no dataFiles are deleted for the B, C, D snapshot
Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
- i.addedFiles().forEach(item -> {
+ i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
@@ -700,7 +700,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
// Make sure no dataFiles are deleted for the staged snapshot
Lists.newArrayList(snapshotB).forEach(i -> {
- i.addedFiles().forEach(item -> {
+ i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
@@ -714,7 +714,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
// Make sure no dataFiles are deleted for the staged and cherry-pick
Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
- i.addedFiles().forEach(item -> {
+ i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
@@ -762,7 +762,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -772,7 +772,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create replace manifest with a rewritten
manifest",
- 1, secondSnapshot.allManifests().size());
+ 1, secondSnapshot.allManifests(table.io()).size());
table.newAppend()
.appendFile(FILE_B)
@@ -798,9 +798,9 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Assert.assertEquals("Should remove expired manifest lists and deleted data
file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
- firstSnapshot.allManifests().get(0).path(), // manifest was
rewritten for delete
+ firstSnapshot.allManifests(table.io()).get(0).path(), // manifest
was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
- secondSnapshot.allManifests().get(0).path(), // manifest contained
only deletes, was dropped
+ secondSnapshot.allManifests(table.io()).get(0).path(), // manifest
contained only deletes, was dropped
FILE_A.path()), // deleted
deletedFiles);
@@ -821,7 +821,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -831,7 +831,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should replace manifest with a rewritten manifest",
- 1, secondSnapshot.allManifests().size());
+ 1, secondSnapshot.allManifests(table.io()).size());
table.newFastAppend() // do not merge to keep the last snapshot's manifest
valid
.appendFile(FILE_C)
@@ -857,7 +857,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Assert.assertEquals("Should remove expired manifest lists and deleted data
file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
- firstSnapshot.allManifests().get(0).path(), // manifest was
rewritten for delete
+ firstSnapshot.allManifests(table.io()).get(0).path(), // manifest
was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
FILE_A.path()), // deleted
deletedFiles);
@@ -879,7 +879,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -888,8 +888,8 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
- Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests());
- secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+ Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+ secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
Assert.assertEquals("Should add one new manifest for append", 1,
secondSnapshotManifests.size());
table.manageSnapshots()
@@ -928,7 +928,7 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
- 1, firstSnapshot.allManifests().size());
+ 1, firstSnapshot.allManifests(table.io()).size());
rightAfterSnapshot();
@@ -937,8 +937,8 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
.commit();
Snapshot secondSnapshot = table.currentSnapshot();
- Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests());
- secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+ Set<ManifestFile> secondSnapshotManifests =
Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+ secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
Assert.assertEquals("Should add one new manifest for append", 1,
secondSnapshotManifests.size());
table.manageSnapshots()
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 3f465fe72..9d4602765 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -269,14 +269,14 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertEquals(
"Data manifest should not have existing data file",
0,
- (long)
table.currentSnapshot().dataManifests().get(0).existingFilesCount());
+ (long)
table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
Assert.assertEquals("Data manifest should have 1 delete data file",
1L,
- (long)
table.currentSnapshot().dataManifests().get(0).deletedFilesCount());
+ (long)
table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
Assert.assertEquals(
"Delete manifest added row count should equal total count",
total,
- (long)
table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
+ (long)
table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
}
@Test
@@ -962,7 +962,8 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
- Assert.assertTrue("Should have written 40+ files",
Iterables.size(table.currentSnapshot().addedFiles()) >= 40);
+ Assert.assertTrue("Should have written 40+ files",
+ Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40);
table.refresh();
@@ -1223,7 +1224,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
NestedField field = table.schema().caseInsensitiveFindField(column);
Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
- Map<StructLike, List<DataFile>> filesByPartition =
Streams.stream(table.currentSnapshot().addedFiles())
+ Map<StructLike, List<DataFile>> filesByPartition =
Streams.stream(table.currentSnapshot().addedFiles(table.io()))
.collect(Collectors.groupingBy(DataFile::partition));
Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 40adb7d4c..f30251e74 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -129,7 +129,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -143,7 +143,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -183,7 +183,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -207,7 +207,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
// table should reflect the changes, since the commit was successful
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -262,7 +262,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 4 manifests before rewrite", 4,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -284,7 +284,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -347,7 +347,8 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.stagingLocation(temp.newFolder().toString())
.execute();
- Assert.assertEquals("Action should rewrite all manifests",
snapshot.allManifests(), result.rewrittenManifests());
+ Assert.assertEquals("Action should rewrite all manifests",
+ snapshot.allManifests(table.io()), result.rewrittenManifests());
Assert.assertEquals("Action should add 1 manifest", 1,
Iterables.size(result.addedManifests()));
} finally {
@@ -375,7 +376,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests before rewrite", 1,
manifests.size());
// set the target manifest size to a small value to force splitting
records into multiple files
@@ -395,7 +396,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
@@ -430,7 +431,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
SparkActions actions = SparkActions.get();
@@ -447,7 +448,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
- List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
Assert.assertFalse("First manifest must be rewritten",
newManifests.contains(manifests.get(0)));
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 139136282..f6d292f89 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -181,7 +181,7 @@ public class TestDataFrameWrites extends AvroDataTest {
}
Assert.assertEquals("Both iterators should be exhausted",
expectedIter.hasNext(), actualIter.hasNext());
- table.currentSnapshot().addedFiles().forEach(dataFile ->
+ table.currentSnapshot().addedFiles(table.io()).forEach(dataFile ->
Assert.assertTrue(
String.format(
"File should have the parent directory %s, but has: %s.",
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 7655b4b82..55605288b 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -203,7 +203,7 @@ public class TestDataSourceOptions {
.mode("append")
.save(tableLocation);
- List<DataFile> files =
Lists.newArrayList(icebergTable.currentSnapshot().addedFiles());
+ List<DataFile> files =
Lists.newArrayList(icebergTable.currentSnapshot().addedFiles(icebergTable.io()));
Assert.assertEquals("Should have written 1 file", 1, files.size());
long fileSize = files.get(0).fileSizeInBytes();
@@ -327,7 +327,7 @@ public class TestDataSourceOptions {
.mode("append")
.save(tableLocation);
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Must be 2 manifests", 2, manifests.size());
@@ -356,7 +356,7 @@ public class TestDataSourceOptions {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
- tables.create(SCHEMA, spec, options, tableLocation);
+ Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation);
List<SimpleRecord> expectedRecords = Lists.newArrayList(
new SimpleRecord(1, "a"),
@@ -371,7 +371,7 @@ public class TestDataSourceOptions {
int splitSize = (int) TableProperties.METADATA_SPLIT_SIZE_DEFAULT; // 32MB
split size
int expectedSplits = ((int) tables.load(tableLocation + "#entries")
- .currentSnapshot().allManifests().get(0).length() + splitSize - 1) /
splitSize;
+ .currentSnapshot().allManifests(icebergTable.io()).get(0).length() +
splitSize - 1) / splitSize;
Dataset<Row> metadataDf = spark.read()
.format("iceberg")
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ea6aaae53..d6f65af4d 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -148,9 +148,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Snapshot snapshot = table.currentSnapshot();
- Assert.assertEquals("Should only contain one manifest", 1,
snapshot.allManifests().size());
+ Assert.assertEquals("Should only contain one manifest", 1,
snapshot.allManifests(table.io()).size());
- InputFile manifest =
table.io().newInputFile(snapshot.allManifests().get(0).path());
+ InputFile manifest =
table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path());
List<GenericData.Record> expected = Lists.newArrayList();
try (CloseableIterable<GenericData.Record> rows =
Avro.read(manifest).project(entriesTable.schema()).build()) {
// each row must inherit snapshot_id and sequence_number
@@ -206,7 +206,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
List<Object[]> singleActual = rowsToJava(spark.read()
.format("iceberg")
@@ -233,7 +233,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
List<Object[]> multiActual = rowsToJava(spark.read()
.format("iceberg")
@@ -261,7 +261,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
List<Object[]> multiActual = rowsToJava(spark.read()
.format("iceberg")
@@ -308,7 +308,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest :
Iterables.concat(Iterables.transform(table.snapshots(),
Snapshot::allManifests))) {
+ for (ManifestFile manifest : Iterables.concat(
+ Iterables.transform(table.snapshots(), s ->
s.allManifests(table.io())))) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
// each row must inherit snapshot_id and sequence_number
@@ -384,7 +385,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -441,7 +442,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -530,7 +531,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
- DataFile toDelete =
Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+ DataFile toDelete =
Iterables.getOnlyElement(table.currentSnapshot().addedFiles(table.io()));
// add a second file
df2.select("id", "data").write()
@@ -547,7 +548,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.collectAsList();
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().dataManifests(table.io())) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -644,7 +645,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest :
Iterables.concat(Iterables.transform(table.snapshots(),
Snapshot::dataManifests))) {
+ Iterable<ManifestFile> dataManifests =
Iterables.concat(Iterables.transform(table.snapshots(),
+ snapshot -> snapshot.dataManifests(table.io())));
+ for (ManifestFile manifest : dataManifests) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows =
Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
@@ -897,7 +900,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
manifestTable.schema(), "manifests"));
GenericRecordBuilder summaryBuilder = new
GenericRecordBuilder(AvroSchemaUtil.convert(
manifestTable.schema().findType("partition_summaries.element").asStructType(),
"partition_summary"));
- List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+ List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
builder
.set("content", manifest.content().id())
.set("path", manifest.path())
@@ -967,7 +970,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
GenericRecordBuilder builder = new
GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct()));
GenericRecordBuilder summaryBuilder = new
GenericRecordBuilder(AvroSchemaUtil.convert(
projectedSchema.findType("partition_summaries.element").asStructType(),
"partition_summary"));
- List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+ List<GenericData.Record> expected =
Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
builder.set("partition_spec_id", manifest.partitionSpecId())
.set("path", manifest.path())
.set("partition_summaries", Lists.transform(manifest.partitions(),
partition ->
@@ -999,11 +1002,11 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.mode("append")
.save(loadLocation(tableIdentifier));
- manifests.addAll(table.currentSnapshot().allManifests());
+ manifests.addAll(table.currentSnapshot().allManifests(table.io()));
table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
- manifests.addAll(table.currentSnapshot().allManifests());
+ manifests.addAll(table.currentSnapshot().allManifests(table.io()));
List<Row> actual = spark.read()
.format("iceberg")
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index f6ca5f7de..cd1404766 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -156,7 +156,7 @@ public class TestSparkDataFile {
table.refresh();
- List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
List<DataFile> dataFiles = Lists.newArrayList();
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 8bf57ba65..5b158c518 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -133,7 +133,7 @@ public class TestSparkDataWrite {
List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(),
actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
- for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
// TODO: avro not support split
if (!format.equals(FileFormat.AVRO)) {
@@ -372,7 +372,7 @@ public class TestSparkDataWrite {
Assert.assertEquals("Result rows should match", expected, actual);
List<DataFile> files = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
@@ -590,7 +590,7 @@ public class TestSparkDataWrite {
Assert.assertEquals("Result rows should match", expected, actual);
List<DataFile> files = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+ for (ManifestFile manifest :
table.currentSnapshot().allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
index 3952019fb..187dd4470 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
@@ -61,7 +61,7 @@ public class TestRefreshTable extends SparkCatalogTestBase {
// Modify table outside of spark, it should be cached so Spark should see
the same value after mutation
Table table = validationCatalog.loadTable(tableIdent);
- DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+ DataFile file =
table.currentSnapshot().addedFiles(table.io()).iterator().next();
table.newDelete().deleteFile(file).commit();
List<Object[]> cachedActual = sql("SELECT * FROM %s", tableName);