This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new a72cd5f89d Core: Include all reachable snapshots with v1 format and 
REF snapshot mode (#7621) (#8027)
a72cd5f89d is described below

commit a72cd5f89dd6e30936781509e0e46efe206f4731
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jul 10 20:13:14 2023 +0200

    Core: Include all reachable snapshots with v1 format and REF snapshot mode 
(#7621) (#8027)
---
 .../java/org/apache/iceberg/TableMetadata.java     |   5 -
 .../org/apache/iceberg/TestSnapshotLoading.java    |   5 +
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 130 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index bd409bdeed..25af350d5e 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -499,11 +499,6 @@ public class TableMetadata implements Serializable {
       List<Snapshot> loadedSnapshots = 
Lists.newArrayList(snapshotsSupplier.get());
       loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);
 
-      // Format version 1 does not have accurate sequence numbering, so remove 
based on timestamp
-      if (this.formatVersion == 1) {
-        loadedSnapshots.removeIf(s -> s.timestampMillis() > 
currentSnapshot().timestampMillis());
-      }
-
       this.snapshots = ImmutableList.copyOf(loadedSnapshots);
       this.snapshotsById = indexAndValidateSnapshots(snapshots, 
lastSequenceNumber);
       validateCurrentSnapshot();
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
index d80ae4b2e6..25a48ce062 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.SerializableSupplier;
+import org.assertj.core.api.Assumptions;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -131,6 +132,10 @@ public class TestSnapshotLoading extends TableTestBase {
 
   @Test
   public void testFutureSnapshotsAreRemoved() {
+    Assumptions.assumeThat(formatVersion)
+        .as("Future snapshots are only removed for V2 tables")
+        .isGreaterThan(1);
+
     table.newFastAppend().appendFile(FILE_C).commit();
 
     TableMetadata futureTableMetadata =
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 02468f3d9b..ac9bcd803e 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.jdbc.JdbcCatalog;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
 import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
@@ -76,6 +77,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
@@ -849,6 +852,133 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
             any());
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"1", "2"})
+  public void testTableSnapshotLoadingWithDivergedBranches(String 
formatVersion) {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+
+    RESTCatalog catalog =
+        new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) 
-> adapter);
+    catalog.initialize(
+        "test",
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            "ignored",
+            CatalogProperties.FILE_IO_IMPL,
+            "org.apache.iceberg.inmemory.InMemoryFileIO",
+            "snapshot-loading-mode",
+            "refs"));
+
+    Table table =
+        catalog.createTable(
+            TABLE,
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of("format-version", formatVersion));
+
+    table
+        .newFastAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withPath("/path/to/data-a.parquet")
+                .withFileSizeInBytes(10)
+                .withRecordCount(2)
+                .build())
+        .commit();
+
+    String branch = "divergedBranch";
+    table.manageSnapshots().createBranch(branch, 
table.currentSnapshot().snapshotId()).commit();
+
+    // branch and main are diverged now
+    table
+        .newFastAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withPath("/path/to/data-b.parquet")
+                .withFileSizeInBytes(10)
+                .withRecordCount(2)
+                .build())
+        .toBranch(branch)
+        .commit();
+
+    ResourcePaths paths = 
ResourcePaths.forCatalogProperties(Maps.newHashMap());
+
+    // Respond with only referenced snapshots
+    Answer<?> refsAnswer =
+        invocation -> {
+          LoadTableResponse originalResponse = (LoadTableResponse) 
invocation.callRealMethod();
+          TableMetadata fullTableMetadata = originalResponse.tableMetadata();
+
+          Set<Long> referencedSnapshotIds =
+              fullTableMetadata.refs().values().stream()
+                  .map(SnapshotRef::snapshotId)
+                  .collect(Collectors.toSet());
+
+          TableMetadata refsMetadata =
+              fullTableMetadata.removeSnapshotsIf(
+                  s -> !referencedSnapshotIds.contains(s.snapshotId()));
+
+          return LoadTableResponse.builder()
+              .withTableMetadata(refsMetadata)
+              .addAllConfig(originalResponse.config())
+              .build();
+        };
+
+    Mockito.doAnswer(refsAnswer)
+        .when(adapter)
+        .execute(
+            eq(HTTPMethod.GET),
+            eq(paths.table(TABLE)),
+            eq(ImmutableMap.of("snapshots", "refs")),
+            any(),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    Table refsTables = catalog.loadTable(TABLE);
+    
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
+
+    // verify that the table was loaded with the refs argument
+    verify(adapter, times(1))
+        .execute(
+            eq(HTTPMethod.GET),
+            eq(paths.table(TABLE)),
+            eq(ImmutableMap.of("snapshots", "refs")),
+            any(),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    // verify that all snapshots are loaded when referenced
+    assertThat(catalog.loadTable(TABLE).snapshots())
+        .containsExactlyInAnyOrderElementsOf(table.snapshots());
+    verify(adapter, times(1))
+        .execute(
+            eq(HTTPMethod.GET),
+            eq(paths.table(TABLE)),
+            eq(ImmutableMap.of("snapshots", "all")),
+            any(),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    // verify that committing to branch is possible
+    catalog
+        .loadTable(TABLE)
+        .newAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withPath("/path/to/data-c.parquet")
+                .withFileSizeInBytes(10)
+                .withRecordCount(2)
+                .build())
+        .toBranch(branch)
+        .commit();
+
+    assertThat(catalog.loadTable(TABLE).snapshots())
+        .hasSizeGreaterThan(Lists.newArrayList(table.snapshots()).size());
+  }
+
   public void testTableAuth(
       String catalogToken,
       Map<String, String> credentials,

Reply via email to