This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f71e9de Improve filtering in Snapshot#addedFiles (#341)
f71e9de is described below
commit f71e9de1a9a340725dd0f33d994b41bb8ba9136d
Author: Ryan Blue <[email protected]>
AuthorDate: Fri Aug 2 12:13:08 2019 -0700
Improve filtering in Snapshot#addedFiles (#341)
---
.../main/java/org/apache/iceberg/BaseSnapshot.java | 49 ++++++++++++----------
.../java/org/apache/iceberg/ManifestReader.java | 24 ++++++-----
2 files changed, 41 insertions(+), 32 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index c2e59af..ff44a9c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -20,6 +20,8 @@
package org.apache.iceberg;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
@@ -152,32 +154,35 @@ class BaseSnapshot implements Snapshot {
}
private void cacheChanges() {
- List<DataFile> adds = Lists.newArrayList();
- List<DataFile> deletes = Lists.newArrayList();
-
- // accumulate adds and deletes from all manifests.
- // because manifests can be reused in newer snapshots, filter the changes
by snapshot id.
- for (String manifest : Iterables.transform(manifests(),
ManifestFile::path)) {
- try (ManifestReader reader = ManifestReader.read(
- ops.io().newInputFile(manifest),
- ops.current()::spec)) {
- for (ManifestEntry add : reader.addedFiles()) {
- if (add.snapshotId() == snapshotId) {
- adds.add(add.file().copyWithoutStats());
- }
+ ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
+ ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();
+
+ // read only manifests that were created by this snapshot
+ Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
+ manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
+ try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops,
changedManifests)
+ .ignoreExisting()
+ .select(ManifestReader.CHANGE_COLUNNS)
+ .entries()) {
+ for (ManifestEntry entry : entries) {
+ switch (entry.status()) {
+ case ADDED:
+ adds.add(entry.file().copyWithoutStats());
+ break;
+ case DELETED:
+ deletes.add(entry.file().copyWithoutStats());
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected entry status, not added or deleted: " + entry);
}
- for (ManifestEntry delete : reader.deletedFiles()) {
- if (delete.snapshotId() == snapshotId) {
- deletes.add(delete.file().copyWithoutStats());
- }
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close reader while caching
changes");
}
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close entries while caching
changes");
}
- this.cachedAdds = adds;
- this.cachedDeletes = deletes;
+ this.cachedAdds = adds.build();
+ this.cachedDeletes = deletes.build();
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index dceac87..c634d29 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -50,7 +50,7 @@ public class ManifestReader extends CloseableGroup implements
Filterable<Filtere
private static final Logger LOG =
LoggerFactory.getLogger(ManifestReader.class);
private static final List<String> ALL_COLUMNS = Lists.newArrayList("*");
- private static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
+ static final List<String> CHANGE_COLUNNS = Lists.newArrayList(
"file_path", "file_format", "partition", "record_count",
"file_size_in_bytes");
// Visible for testing
@@ -161,16 +161,20 @@ public class ManifestReader extends CloseableGroup
implements Filterable<Filtere
List<ManifestEntry> adds = Lists.newArrayList();
List<ManifestEntry> deletes = Lists.newArrayList();
- for (ManifestEntry entry : entries(fileSchema.select(CHANGE_COLUNNS))) {
- switch (entry.status()) {
- case ADDED:
- adds.add(entry.copyWithoutStats());
- break;
- case DELETED:
- deletes.add(entry.copyWithoutStats());
- break;
- default:
+ try (CloseableIterable<ManifestEntry> entries =
entries(fileSchema.select(CHANGE_COLUNNS))) {
+ for (ManifestEntry entry : entries) {
+ switch (entry.status()) {
+ case ADDED:
+ adds.add(entry.copyWithoutStats());
+ break;
+ case DELETED:
+ deletes.add(entry.copyWithoutStats());
+ break;
+ default:
+ }
}
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close manifest entries");
}
this.cachedAdds = adds;