This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new a72cd5f89d Core: Include all reachable snapshots with v1 format and
REF snapshot mode (#7621) (#8027)
a72cd5f89d is described below
commit a72cd5f89dd6e30936781509e0e46efe206f4731
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jul 10 20:13:14 2023 +0200
Core: Include all reachable snapshots with v1 format and REF snapshot mode
(#7621) (#8027)
---
.../java/org/apache/iceberg/TableMetadata.java | 5 -
.../org/apache/iceberg/TestSnapshotLoading.java | 5 +
.../org/apache/iceberg/rest/TestRESTCatalog.java | 130 +++++++++++++++++++++
3 files changed, 135 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index bd409bdeed..25af350d5e 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -499,11 +499,6 @@ public class TableMetadata implements Serializable {
List<Snapshot> loadedSnapshots =
Lists.newArrayList(snapshotsSupplier.get());
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);
- // Format version 1 does not have accurate sequence numbering, so remove
based on timestamp
- if (this.formatVersion == 1) {
- loadedSnapshots.removeIf(s -> s.timestampMillis() >
currentSnapshot().timestampMillis());
- }
-
this.snapshots = ImmutableList.copyOf(loadedSnapshots);
this.snapshotsById = indexAndValidateSnapshots(snapshots,
lastSequenceNumber);
validateCurrentSnapshot();
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
index d80ae4b2e6..25a48ce062 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SerializableSupplier;
+import org.assertj.core.api.Assumptions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -131,6 +132,10 @@ public class TestSnapshotLoading extends TableTestBase {
@Test
public void testFutureSnapshotsAreRemoved() {
+ Assumptions.assumeThat(formatVersion)
+ .as("Future snapshots are only removed for V2 tables")
+ .isGreaterThan(1);
+
table.newFastAppend().appendFile(FILE_C).commit();
TableMetadata futureTableMetadata =
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 02468f3d9b..ac9bcd803e 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
@@ -76,6 +77,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@@ -849,6 +852,133 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
any());
}
+ @ParameterizedTest
+ @ValueSource(strings = {"1", "2"})
+ public void testTableSnapshotLoadingWithDivergedBranches(String
formatVersion) {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog =
+ new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config)
-> adapter);
+ catalog.initialize(
+ "test",
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ "ignored",
+ CatalogProperties.FILE_IO_IMPL,
+ "org.apache.iceberg.inmemory.InMemoryFileIO",
+ "snapshot-loading-mode",
+ "refs"));
+
+ Table table =
+ catalog.createTable(
+ TABLE,
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of("format-version", formatVersion));
+
+ table
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build())
+ .commit();
+
+ String branch = "divergedBranch";
+ table.manageSnapshots().createBranch(branch,
table.currentSnapshot().snapshotId()).commit();
+
+ // branch and main are diverged now
+ table
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build())
+ .toBranch(branch)
+ .commit();
+
+ ResourcePaths paths =
ResourcePaths.forCatalogProperties(Maps.newHashMap());
+
+ // Respond with only referenced snapshots
+ Answer<?> refsAnswer =
+ invocation -> {
+ LoadTableResponse originalResponse = (LoadTableResponse)
invocation.callRealMethod();
+ TableMetadata fullTableMetadata = originalResponse.tableMetadata();
+
+ Set<Long> referencedSnapshotIds =
+ fullTableMetadata.refs().values().stream()
+ .map(SnapshotRef::snapshotId)
+ .collect(Collectors.toSet());
+
+ TableMetadata refsMetadata =
+ fullTableMetadata.removeSnapshotsIf(
+ s -> !referencedSnapshotIds.contains(s.snapshotId()));
+
+ return LoadTableResponse.builder()
+ .withTableMetadata(refsMetadata)
+ .addAllConfig(originalResponse.config())
+ .build();
+ };
+
+ Mockito.doAnswer(refsAnswer)
+ .when(adapter)
+ .execute(
+ eq(HTTPMethod.GET),
+ eq(paths.table(TABLE)),
+ eq(ImmutableMap.of("snapshots", "refs")),
+ any(),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ Table refsTables = catalog.loadTable(TABLE);
+
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
+
+ // verify that the table was loaded with the refs argument
+ verify(adapter, times(1))
+ .execute(
+ eq(HTTPMethod.GET),
+ eq(paths.table(TABLE)),
+ eq(ImmutableMap.of("snapshots", "refs")),
+ any(),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ // verify that all snapshots are loaded when referenced
+ assertThat(catalog.loadTable(TABLE).snapshots())
+ .containsExactlyInAnyOrderElementsOf(table.snapshots());
+ verify(adapter, times(1))
+ .execute(
+ eq(HTTPMethod.GET),
+ eq(paths.table(TABLE)),
+ eq(ImmutableMap.of("snapshots", "all")),
+ any(),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ // verify that committing to branch is possible
+ catalog
+ .loadTable(TABLE)
+ .newAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-c.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build())
+ .toBranch(branch)
+ .commit();
+
+ assertThat(catalog.loadTable(TABLE).snapshots())
+ .hasSizeGreaterThan(Lists.newArrayList(table.snapshots()).size());
+ }
+
public void testTableAuth(
String catalogToken,
Map<String, String> credentials,